From 8f8ff0e6df0f3f5504f0a823c828d532845255aa Mon Sep 17 00:00:00 2001 From: Jiabao Qu Date: Sat, 22 Aug 2020 10:28:58 +0800 Subject: [PATCH] feat: support Stop() in queue --- queue/queue.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 7b0404a04..1e2639e87 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -30,7 +30,8 @@ type Queue struct { Threads int storage Storage wake chan struct{} - mut sync.Mutex // guards wake + mut sync.Mutex // guards wake and running + running bool } // InMemoryQueueStorage is the default implementation of the Storage interface. @@ -62,6 +63,7 @@ func New(threads int, s Storage) (*Queue, error) { return &Queue{ Threads: threads, storage: s, + running: true, }, nil } @@ -122,11 +124,12 @@ func (q *Queue) Size() (int, error) { // The given Storage must not be used directly while Run blocks. func (q *Queue) Run(c *colly.Collector) error { q.mut.Lock() - if q.wake != nil { + if q.wake != nil && q.running == true { q.mut.Unlock() panic("cannot call duplicate Queue.Run") } q.wake = make(chan struct{}) + q.running = true q.mut.Unlock() requestc := make(chan *colly.Request) @@ -139,6 +142,13 @@ func (q *Queue) Run(c *colly.Collector) error { return <-errc } +// Stop will stop the running queue +func (q *Queue) Stop() { + q.mut.Lock() + q.running = false + q.mut.Unlock() +} + func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complete <-chan struct{}, errc chan<- error) { var active int for { @@ -147,7 +157,7 @@ func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complet errc <- err break } - if size == 0 && active == 0 { + if size == 0 && active == 0 || !q.running { // Terminate when // 1. No active requests // 2. Emtpy queue