diff --git a/log.txt b/log.txt new file mode 100644 index 0000000..e69de29 diff --git a/main.go b/main.go index d1d042d..65b967e 100644 --- a/main.go +++ b/main.go @@ -487,8 +487,9 @@ func resumeCrawler(domain string) { } // checkPause is called inside the crawl loop between requests. -// If a pause signal is pending it blocks until resume arrives. -func checkPause(domain string, br *Broker) { +// If a pause signal is pending (or already buffered) it blocks until resume. +// Returns true if the crawler was paused (and has now resumed). +func checkPause(domain string, br *Broker) bool { pauseChsMu.RLock() pCh := pauseChs[domain] rCh := resumeChs[domain] @@ -496,19 +497,53 @@ func checkPause(domain string, br *Broker) { select { case <-pCh: - setDomainStatus(domain, statusPaused) - emit(br, "paused", map[string]string{"domain": domain}) - log.Printf("[%s] paused", domain) - // drain any duplicate pause signals + // drain any stacked-up duplicate signals for len(pCh) > 0 { <-pCh } - // block until resume + setDomainStatus(domain, statusPaused) + emit(br, "paused", map[string]string{"domain": domain}) + log.Printf("[%s] paused", domain) + // block until resume signal arrives <-rCh setDomainStatus(domain, statusRunning) emit(br, "resumed", map[string]string{"domain": domain}) log.Printf("[%s] resumed", domain) + return true default: + return false + } +} + +// interruptibleSleep sleeps for d but wakes up immediately if a pause signal +// arrives. Returns true if it was interrupted by a pause (caller should re-queue +// the current URL and continue the loop so checkPause can block properly). +func interruptibleSleep(domain string, br *Broker, d time.Duration) bool { + pauseChsMu.RLock() + pCh := pauseChs[domain] + pauseChsMu.RUnlock() + + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-timer.C: + return false + case <-pCh: + // drain duplicates then block until resumed + for len(pCh) > 0 { + <-pCh + } + pauseChsMu.RLock() + rCh := resumeChs[domain] + pauseChsMu.RUnlock() + setDomainStatus(domain, statusPaused) + emit(br, "paused", map[string]string{"domain": domain}) + log.Printf("[%s] paused (during wait)", domain) + <-rCh + setDomainStatus(domain, statusRunning) + emit(br, "resumed", map[string]string{"domain": domain}) + log.Printf("[%s] resumed", domain) + return true } } @@ -698,17 +733,18 @@ func crawlDomain(domain string, intervalSec int) { continue } - // random delay [interval, interval*2] + // random delay [interval, interval*2] – interruptible by pause delaySec := intervalSec + rand.Intn(intervalSec+1) emit(br, "waiting", map[string]interface{}{ "url": target, "delay_s": delaySec, "queue": queueLen(db), }) - time.Sleep(time.Duration(delaySec) * time.Second) - - // ── pause check after sleep (could have been paused during wait) ── - checkPause(domain, br) + if paused := interruptibleSleep(domain, br, time.Duration(delaySec)*time.Second); paused { + // put the URL back so it is retried after resume + enqueueURL(db, target) + continue + } emit(br, "fetching", map[string]string{"url": target}) resp, err := httpClient.Get(target) @@ -1020,16 +1056,22 @@ func main() { if rows.Scan(&d, &iv, &status) != nil { continue } - // don't restart completed or paused crawls automatically; - // only restart those that were mid-flight (running/pending) + // don't restart completed crawls; paused crawls restart but + // immediately re-enter pause so the user's choice is preserved. if status == statusDone { continue } crawlersMu.Lock() if !crawlers[d] { crawlers[d] = true - // reset status so it shows running - setDomainStatus(d, statusPending) + if status == statusPaused { + // Pre-load a pause signal before the goroutine starts so + // the crawler immediately blocks in its pause state. + ensurePauseChannels(d) + pauseCrawler(d) + } else { + setDomainStatus(d, statusPending) + } go crawlDomain(d, iv) } crawlersMu.Unlock()