sqlite
This commit is contained in:
74
main.go
74
main.go
@@ -487,8 +487,9 @@ func resumeCrawler(domain string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// checkPause is called inside the crawl loop between requests.
|
// checkPause is called inside the crawl loop between requests.
|
||||||
// If a pause signal is pending it blocks until resume arrives.
|
// If a pause signal is pending (or already buffered) it blocks until resume.
|
||||||
func checkPause(domain string, br *Broker) {
|
// Returns true if the crawler was paused (and has now resumed).
|
||||||
|
func checkPause(domain string, br *Broker) bool {
|
||||||
pauseChsMu.RLock()
|
pauseChsMu.RLock()
|
||||||
pCh := pauseChs[domain]
|
pCh := pauseChs[domain]
|
||||||
rCh := resumeChs[domain]
|
rCh := resumeChs[domain]
|
||||||
@@ -496,19 +497,53 @@ func checkPause(domain string, br *Broker) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-pCh:
|
case <-pCh:
|
||||||
setDomainStatus(domain, statusPaused)
|
// drain any stacked-up duplicate signals
|
||||||
emit(br, "paused", map[string]string{"domain": domain})
|
|
||||||
log.Printf("[%s] paused", domain)
|
|
||||||
// drain any duplicate pause signals
|
|
||||||
for len(pCh) > 0 {
|
for len(pCh) > 0 {
|
||||||
<-pCh
|
<-pCh
|
||||||
}
|
}
|
||||||
// block until resume
|
setDomainStatus(domain, statusPaused)
|
||||||
|
emit(br, "paused", map[string]string{"domain": domain})
|
||||||
|
log.Printf("[%s] paused", domain)
|
||||||
|
// block until resume signal arrives
|
||||||
<-rCh
|
<-rCh
|
||||||
setDomainStatus(domain, statusRunning)
|
setDomainStatus(domain, statusRunning)
|
||||||
emit(br, "resumed", map[string]string{"domain": domain})
|
emit(br, "resumed", map[string]string{"domain": domain})
|
||||||
log.Printf("[%s] resumed", domain)
|
log.Printf("[%s] resumed", domain)
|
||||||
|
return true
|
||||||
default:
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// interruptibleSleep sleeps for d but wakes up immediately if a pause signal
|
||||||
|
// arrives. Returns true if it was interrupted by a pause (caller should re-queue
|
||||||
|
// the current URL and continue the loop so checkPause can block properly).
|
||||||
|
func interruptibleSleep(domain string, br *Broker, d time.Duration) bool {
|
||||||
|
pauseChsMu.RLock()
|
||||||
|
pCh := pauseChs[domain]
|
||||||
|
pauseChsMu.RUnlock()
|
||||||
|
|
||||||
|
timer := time.NewTimer(d)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
return false
|
||||||
|
case <-pCh:
|
||||||
|
// drain duplicates then block until resumed
|
||||||
|
for len(pCh) > 0 {
|
||||||
|
<-pCh
|
||||||
|
}
|
||||||
|
pauseChsMu.RLock()
|
||||||
|
rCh := resumeChs[domain]
|
||||||
|
pauseChsMu.RUnlock()
|
||||||
|
setDomainStatus(domain, statusPaused)
|
||||||
|
emit(br, "paused", map[string]string{"domain": domain})
|
||||||
|
log.Printf("[%s] paused (during wait)", domain)
|
||||||
|
<-rCh
|
||||||
|
setDomainStatus(domain, statusRunning)
|
||||||
|
emit(br, "resumed", map[string]string{"domain": domain})
|
||||||
|
log.Printf("[%s] resumed", domain)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -698,17 +733,18 @@ func crawlDomain(domain string, intervalSec int) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// random delay [interval, interval*2]
|
// random delay [interval, interval*2] – interruptible by pause
|
||||||
delaySec := intervalSec + rand.Intn(intervalSec+1)
|
delaySec := intervalSec + rand.Intn(intervalSec+1)
|
||||||
emit(br, "waiting", map[string]interface{}{
|
emit(br, "waiting", map[string]interface{}{
|
||||||
"url": target,
|
"url": target,
|
||||||
"delay_s": delaySec,
|
"delay_s": delaySec,
|
||||||
"queue": queueLen(db),
|
"queue": queueLen(db),
|
||||||
})
|
})
|
||||||
time.Sleep(time.Duration(delaySec) * time.Second)
|
if paused := interruptibleSleep(domain, br, time.Duration(delaySec)*time.Second); paused {
|
||||||
|
// put the URL back so it is retried after resume
|
||||||
// ── pause check after sleep (could have been paused during wait) ──
|
enqueueURL(db, target)
|
||||||
checkPause(domain, br)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
emit(br, "fetching", map[string]string{"url": target})
|
emit(br, "fetching", map[string]string{"url": target})
|
||||||
resp, err := httpClient.Get(target)
|
resp, err := httpClient.Get(target)
|
||||||
@@ -1020,16 +1056,22 @@ func main() {
|
|||||||
if rows.Scan(&d, &iv, &status) != nil {
|
if rows.Scan(&d, &iv, &status) != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// don't restart completed or paused crawls automatically;
|
// don't restart completed crawls; paused crawls restart but
|
||||||
// only restart those that were mid-flight (running/pending)
|
// immediately re-enter pause so the user's choice is preserved.
|
||||||
if status == statusDone {
|
if status == statusDone {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
crawlersMu.Lock()
|
crawlersMu.Lock()
|
||||||
if !crawlers[d] {
|
if !crawlers[d] {
|
||||||
crawlers[d] = true
|
crawlers[d] = true
|
||||||
// reset status so it shows running
|
if status == statusPaused {
|
||||||
setDomainStatus(d, statusPending)
|
// Pre-load a pause signal before the goroutine starts so
|
||||||
|
// the crawler immediately blocks in its pause state.
|
||||||
|
ensurePauseChannels(d)
|
||||||
|
pauseCrawler(d)
|
||||||
|
} else {
|
||||||
|
setDomainStatus(d, statusPending)
|
||||||
|
}
|
||||||
go crawlDomain(d, iv)
|
go crawlDomain(d, iv)
|
||||||
}
|
}
|
||||||
crawlersMu.Unlock()
|
crawlersMu.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user