Skip to content

Commit

Permalink
WIP for corentin
Browse files Browse the repository at this point in the history
  • Loading branch information
equals215 committed Aug 9, 2024
1 parent 81d424c commit 0d29054
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 31 deletions.
1 change: 1 addition & 0 deletions internal/capture/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func Capture(itemToCapture *item.Item) error {

defer func(i *item.Item) {
waitGroup.Wait()
fmt.Println("Capture done")

if packageClient.useHQ && i.ID != "" {
packageClient.hqFinishedChannel <- i
Expand Down
1 change: 1 addition & 0 deletions internal/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (c *Crawl) Start() (err error) {
reactor.Init(&reactor.Config{
UseWorkers: true,
WorkerRecvCh: c.Workers.Recv,
WorkerCount: int(c.Workers.Count),
UseQueue: true,
Queue: c.Queue,
UseHQ: c.UseHQ,
Expand Down
2 changes: 1 addition & 1 deletion internal/crawl/finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (crawl *Crawl) finish() {
crawl.Log.Warn("[QUEUE] Freezing the dequeue")
crawl.Queue.FreezeDequeue()

crawl.Log.Warn("[PROCESSER] Stopping the processer")
crawl.Log.Warn("[REACTOR] Stopping the reactor")
reactor.Stop()

crawl.Log.Warn("[WORKERS] Waiting for workers to finish")
Expand Down
8 changes: 6 additions & 2 deletions internal/crawl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ func (c *Crawl) handleCrawlPause() {
logrus.Errorln(fmt.Sprintf("Not enough disk space: %d GB required, %f GB available. "+
"Please free some space for the crawler to resume.", c.MinSpaceRequired, spaceLeft))
c.Paused.Set(true)
hq.Paused.CompareAndSwap(false, true)
if c.UseHQ {
hq.Paused.CompareAndSwap(false, true)
}
c.Queue.Paused.Set(true)
c.Workers.Pause <- struct{}{}
stats.SetCrawlState("paused")
} else {
c.Paused.Set(false)
hq.Paused.CompareAndSwap(true, false)
if c.UseHQ {
hq.Paused.CompareAndSwap(true, false)
}
c.Queue.Paused.Set(false)
c.Workers.Unpause <- struct{}{}
if stats.GetCrawlState() == "paused" {
Expand Down
24 changes: 18 additions & 6 deletions internal/reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import (
"sync/atomic"

"github.com/internetarchive/Zeno/internal/item"
"github.com/internetarchive/Zeno/internal/log"
"github.com/internetarchive/Zeno/internal/queue"
)

type Config struct {
UseWorkers bool
WorkerRecvCh chan *item.Item
WorkerCount int

UseQueue bool
Queue *queue.PersistentGroupedQueue

UseHQ bool
HQProducer chan *item.Item

Logger *log.Logger
}

type reactor struct {
Expand Down Expand Up @@ -46,7 +50,11 @@ type reactor struct {
// queueTx chan *item.Item // Output channel to queue

// State
running *atomic.Bool
running *atomic.Bool
workerCount int

// Logger
logger *log.FieldedLogger
}

var (
Expand All @@ -64,8 +72,9 @@ func Init(config *Config) {
includedHosts: []string{},
addIncludedHostsCh: make(chan []string),
rmIncludedHostsCh: make(chan []string),
captureRx: make(chan *item.Item),
captureRx: make(chan *item.Item, config.WorkerCount),
running: new(atomic.Bool),
workerCount: config.WorkerCount,
}
if config.UseWorkers {
reactor.useWorkers = true
Expand All @@ -79,6 +88,7 @@ func Init(config *Config) {
reactor.useHQ = true
reactor.hqTx = config.HQProducer
}
reactor.logger = config.Logger.WithFields(map[string]interface{}{"module": "reactor"})
if !isInit {
packageReactor = reactor
go reactor.run()
Expand Down Expand Up @@ -150,10 +160,12 @@ func (p *reactor) run() {
}
}
case item := <-p.captureRx:
if item == nil || !p.checkHost(item) {
continue
}
p.sendToProducers(item)
go func() {
if item == nil || !p.checkHost(item) {
return
}
p.sendToProducers(item)
}()
default: // TODO : Turn default into a channel receive from queue when queue works with channels
if p.useQueue {
item, err := p.queue.Dequeue()
Expand Down
21 changes: 14 additions & 7 deletions internal/reactor/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import (

func (p *reactor) sendToProducers(itemToSend *item.Item) {
switch itemToSend.Type {
case item.TypeOutlink:
case item.TypeSeed:
if p.useHQ {
p.hqTx <- itemToSend
} else {
// TODO: when queue works with channels, send to queue via channel
p.queue.Enqueue(itemToSend)
case item.TypeSeed, item.TypeOutlink:
select {
case p.workersTx <- itemToSend:
return
default:
if p.useHQ {
p.hqTx <- itemToSend
} else {
// TODO: when queue works with channels, send to queue via channel
err := p.queue.Enqueue(itemToSend)
if err != nil {
p.logger.Error("failed to enqueue item", "error", err)
}
}
}
case item.TypeAsset:
if p.useWorkers {
Expand Down
23 changes: 13 additions & 10 deletions internal/worker/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ func (b *Bus) run() {
close(consumer.done)
}
case item := <-b.Recv:
b.consumers.Range(func(key, value interface{}) bool {
consumer := value.(*consumer)
select {
case consumer.item <- item:
return false
default:
// Channel is full, skip
}
return true
})
for ok := true; ok; {
b.consumers.Range(func(key, value interface{}) bool {
consumer := value.(*consumer)
select {
case consumer.item <- item:
ok = false
return false
default:
// Channel is full, skip
}
return true
})
}
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func (wp *Pool) newWorker() *Worker {
},
stop: make(chan struct{}),
done: make(chan struct{}),
item: make(chan *item.Item),
item: make(chan *item.Item, 1),

pool: wp,
}
Expand Down Expand Up @@ -94,15 +95,16 @@ func (w *Worker) Run() {
continue
}
if w.pool.canDequeue.Load() {
w.state.lastAction = "waiting for queue to be available"
w.state.status = idle
w.state.lastAction = "waiting for queue to be available"
time.Sleep(10 * time.Millisecond)
continue
}

w.state.lastAction = "waiting for next action"
select {
case <-w.stop:
w.state.lastAction = "stopping worker"
close(w.item)
close(w.stop)
w.state.currentItem = nil
Expand All @@ -114,15 +116,15 @@ func (w *Worker) Run() {
case item := <-w.item:
w.state.lastAction = "got item"
if item == nil {
w.state.lastAction = "item is nil"
w.state.lastError = fmt.Errorf("item is nil")
w.Unlock()
continue
}
// Launches the capture of the given item
w.state.lastAction = "starting capture"
w.unsafeCapture(item)
default:
w.Unlock()
}
w.Unlock()
}
}

Expand Down

0 comments on commit 0d29054

Please sign in to comment.