Skip to content

Commit

Permalink
successfully separated stats to a new package
Browse files Browse the repository at this point in the history
  • Loading branch information
equals215 committed Aug 6, 2024
1 parent e39c96f commit 4911c86
Show file tree
Hide file tree
Showing 24 changed files with 675 additions and 263 deletions.
4 changes: 2 additions & 2 deletions internal/crawl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (crawl *Crawl) startAPI() {
"crawled": crawledSeeds + crawledAssets,
"crawledSeeds": crawledSeeds,
"crawledAssets": crawledAssets,
"queued": crawl.Queue.GetStats().TotalElements,
"queued": stats.GetQueueTotalElementsCount(),
"uptime": time.Since(crawl.StartTime).String(),
}

Expand All @@ -57,7 +57,7 @@ func (crawl *Crawl) startAPI() {

http.HandleFunc("/queue", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(crawl.Queue.GetStats())
json.NewEncoder(w).Encode(stats.GetJSONQueueStats())
})

http.HandleFunc("/workers", func(w http.ResponseWriter, r *http.Request) {
Expand Down
7 changes: 5 additions & 2 deletions internal/crawl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type Crawl struct {
Paused *utils.TAtomBool
Finished *utils.TAtomBool

// Live stats
UseLiveStats bool
// Stats
UseLiveStats bool
stopMonitorWARCWaitGroup chan struct{}

// Logger
Log *log.Logger
Expand Down Expand Up @@ -156,7 +157,9 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
}
c.Log = customLogger

// Stats
c.UseLiveStats = config.LiveStats
c.stopMonitorWARCWaitGroup = make(chan struct{})

// If the job name isn't specified, we generate a random name
if config.Job == "" {
Expand Down
41 changes: 26 additions & 15 deletions internal/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ func (c *Crawl) Start() (err error) {
c.HQChannelsWg = new(sync.WaitGroup)
regexOutlinks = xurls.Relaxed()

// Init the stats package
// If LiveStats enabled : launch the reoutine responsible for printing live stats on the standard output
if stats.IsInitialized() {
stats.Reset()
}
ok := stats.Init(&stats.Config{
HandoverUsed: c.UseHandover, //Pass the handover enable bool
LocalDedupeUsed: !c.DisableLocalDedupe, //Invert the local dedupe disable bool
CDXDedupeUsed: c.CDXDedupeServer != "", //Pass true if the CDXDedupeServer address if it's set
})
if !ok {
c.Log.Fatal("unable to init stats")
}

stats.SetJob(c.Job)
stats.SetCrawlState("starting")

if c.UseLiveStats {
go stats.Printer()
}

// Setup the --crawl-time-limit clock
if c.CrawlTimeLimit != 0 {
go func() {
Expand Down Expand Up @@ -115,6 +136,9 @@ func (c *Crawl) Start() (err error) {
}()
}

// Launch the monitorWARCWaitGroup goroutine
go c.monitorWARCWaitGroup()

c.Log.Info("WARC writer initialized")

// TODO: re-implement host limitation
Expand All @@ -140,21 +164,6 @@ func (c *Crawl) Start() (err error) {
// Also starts all the background processes that will handle the workers
c.Workers.Start()

// Init the stats package
// If LiveStats enabled : launch the reoutine responsible for printing live stats on the standard output
ok := stats.Init(&stats.Config{
HandoverUsed: c.UseHandover, //Pass the handover enable bool
LocalDedupeUsed: !c.DisableLocalDedupe, //Invert the local dedupe disable bool
CDXDedupeUsed: c.CDXDedupeServer != "", //Pass true if the CDXDedupeServer address if it's set
})
if !ok {
c.Log.Fatal("unable to init stats")
}

if c.UseLiveStats {
go stats.Printer()
}

// If crawl HQ parameters are specified, then we start the background
// processes responsible for pulling and pushing seeds from and to HQ
if c.UseHQ {
Expand Down Expand Up @@ -198,6 +207,8 @@ func (c *Crawl) Start() (err error) {
c.Log.Info("All seeds are now in queue")
}

stats.SetCrawlState("running")

// Start the background process that will catch when there
// is nothing more to crawl
if !c.UseHQ {
Expand Down
7 changes: 6 additions & 1 deletion internal/crawl/finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (crawl *Crawl) catchFinish() {

for {
time.Sleep(time.Second * 5)
if !crawl.UseHQ && stats.GetActiveWorkers() == 0 && crawl.Queue.GetStats().TotalElements == 0 && !crawl.Finished.Get() && (stats.GetCrawledSeeds()+stats.GetCrawledAssets() > 0) {
if !crawl.UseHQ && stats.GetActiveWorkers() == 0 && stats.GetQueueTotalElementsCount() == 0 && !crawl.Finished.Get() && (stats.GetCrawledSeeds()+stats.GetCrawledAssets() > 0) {
crawl.Log.Warn("No more items to crawl, finishing..")
crawl.finish()
}
Expand All @@ -30,6 +30,7 @@ func (crawl *Crawl) catchFinish() {

func (crawl *Crawl) finish() {
crawl.Finished.Set(true)
stats.SetCrawlState("finishing")

crawl.Log.Warn("[QUEUE] Freezing the dequeue")
crawl.Queue.FreezeDequeue()
Expand All @@ -55,6 +56,7 @@ func (crawl *Crawl) finish() {
}

crawl.Log.Warn("[WARC] Closing writer(s)..")
crawl.stopMonitorWARCWaitGroup <- struct{}{}
crawl.Client.Close()

if crawl.Proxy != "" {
Expand All @@ -73,6 +75,9 @@ func (crawl *Crawl) finish() {
crawl.Log.Warn("[SEENCHECK] Database closed")
}

// Closing the stats
stats.Stop()

crawl.Log.Warn("Finished!")

crawl.Log.Warn("Shutting down the logger, bai bai")
Expand Down
3 changes: 2 additions & 1 deletion internal/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"git.archive.org/wb/gocrawlhq"
"github.com/internetarchive/Zeno/internal/queue"
"github.com/internetarchive/Zeno/internal/stats"
"github.com/internetarchive/Zeno/internal/utils"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (c *Crawl) HQConsumer() {

// If HQContinuousPull is set to true, we will pull URLs from HQ continuously,
// otherwise we will only pull URLs when needed (and when the crawl is not paused)
for (uint64(c.Queue.GetStats().TotalElements) > uint64(HQBatchSize) && !c.HQContinuousPull) || c.Paused.Get() || c.Queue.HandoverOpen.Get() {
for (uint64(stats.GetQueueTotalElementsCount()) > uint64(HQBatchSize) && !c.HQContinuousPull) || c.Paused.Get() || c.Queue.HandoverOpen.Get() {
time.Sleep(time.Millisecond * 50)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/crawl/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var constants sync.Map
func (c *Crawl) genLogFields(err interface{}, URL interface{}, additionalFields map[string]interface{}) (fields logrus.Fields) {
fields = logrus.Fields{}

fields["queued"] = c.Queue.GetStats().TotalElements
fields["queued"] = stats.GetQueueTotalElementsCount()
fields["crawled"] = stats.GetCrawledSeeds() + stats.GetCrawledAssets()
fields["rate"] = stats.GetURIPerSecond()
fields["activeWorkers"] = stats.GetActiveWorkers()
Expand Down
5 changes: 5 additions & 0 deletions internal/crawl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"time"

"github.com/internetarchive/Zeno/internal/stats"
"github.com/internetarchive/Zeno/internal/utils"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -53,9 +54,13 @@ func (c *Crawl) handleCrawlPause() {
"Please free some space for the crawler to resume.", c.MinSpaceRequired, spaceLeft))
c.Paused.Set(true)
c.Queue.Paused.Set(true)
stats.SetCrawlState("paused")
} else {
c.Paused.Set(false)
c.Queue.Paused.Set(false)
if stats.GetCrawlState() == "paused" {
stats.SetCrawlState("running")
}
}

time.Sleep(time.Second)
Expand Down
15 changes: 15 additions & 0 deletions internal/crawl/warc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package crawl
import (
"fmt"
"path"
"time"

"github.com/CorentinB/warc"
"github.com/internetarchive/Zeno/internal/stats"
"github.com/internetarchive/Zeno/internal/utils"
)

Expand All @@ -23,3 +25,16 @@ func (c *Crawl) initWARCRotatorSettings() *warc.RotatorSettings {

return rotatorSettings
}

func (c *Crawl) monitorWARCWaitGroup() {
// Monitor every 250ms
ticker := time.NewTicker(250 * time.Millisecond)
for {
select {
case <-c.stopMonitorWARCWaitGroup:
return
case <-ticker.C:
stats.SetWARCWritingQueue(int32(c.Client.WaitGroup.Size()))
}
}
}
1 change: 1 addition & 0 deletions internal/crawl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (w *Worker) Run() {
w.state.currentItem = nil
w.state.status = completed
w.logger.Info("Worker stopped")
stats.DecreaseTotalWorkers()
return
default:
for w.pool.Crawl.Paused.Get() {
Expand Down
2 changes: 2 additions & 0 deletions internal/crawl/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/internetarchive/Zeno/internal/stats"
)

type WorkerPool struct {
Expand Down Expand Up @@ -34,6 +35,7 @@ func (wp *WorkerPool) Start() {
wp.Crawl.Log.Info("Starting worker", "worker", worker.ID)
go worker.Run()
go worker.WatchHang()
stats.IncreaseTotalWorkers()
}
go wp.WorkerWatcher()
}
Expand Down
6 changes: 6 additions & 0 deletions internal/queue/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"os"
"path"
"testing"

"github.com/internetarchive/Zeno/internal/stats"
)

func Test_canEnqueue(t *testing.T) {
stats.Reset()
stats.Init(nil)
tempDir, err := os.MkdirTemp("", "queue_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
Expand All @@ -25,6 +29,8 @@ func Test_canEnqueue(t *testing.T) {
}

func Test_canDequeue(t *testing.T) {
stats.Reset()
stats.Init(nil)
tempDir, err := os.MkdirTemp("", "queue_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions internal/queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/internetarchive/Zeno/internal/queue/index"
"github.com/internetarchive/Zeno/internal/stats"
)

// Dequeue removes and returns the next item from the queue
Expand All @@ -20,6 +21,7 @@ func (q *PersistentGroupedQueue) dequeueNoCommit() (*Item, error) {
if q.HandoverOpen.Get() {
if item, ok := q.handover.tryGet(); ok && item != nil {
q.handoverCount.Add(1)
stats.UpdateHandoverSuccessGetCount(1)
return item.item, nil
}
}
Expand Down Expand Up @@ -51,6 +53,7 @@ func (q *PersistentGroupedQueue) dequeueNoCommit() (*Item, error) {

if position == 0 && size == 0 {
q.Empty.Set(true)
stats.SetQueueEmpty(true)
return nil, ErrQueueEmpty
}

Expand Down Expand Up @@ -78,6 +81,7 @@ func (q *PersistentGroupedQueue) dequeueCommitted() (*Item, error) {
if q.HandoverOpen.Get() {
if item, ok := q.handover.tryGet(); ok && item != nil {
q.handoverCount.Add(1)
stats.UpdateHandoverSuccessGetCount(1)
return item.item, nil
}
}
Expand Down Expand Up @@ -112,6 +116,7 @@ func (q *PersistentGroupedQueue) dequeueCommitted() (*Item, error) {

if position == 0 && size == 0 {
q.Empty.Set(true)
stats.SetQueueEmpty(true)
return nil, ErrQueueEmpty
}

Expand Down Expand Up @@ -148,6 +153,7 @@ func (q *PersistentGroupedQueue) getNextHost() (string, error) {
// If there are no hosts, we wait for one to be added
if len(hosts) == 0 {
q.Empty.Set(true)
stats.SetQueueEmpty(true)
return q.getNextHost()
}

Expand Down
6 changes: 6 additions & 0 deletions internal/queue/dequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"net/url"
"os"
"testing"

"github.com/internetarchive/Zeno/internal/stats"
)

func TestDequeue(t *testing.T) {
stats.Reset()
stats.Init(nil)
// Create a temporary directory for the queue files
tempDir, err := os.MkdirTemp("", "queue_test")
if err != nil {
Expand Down Expand Up @@ -82,6 +86,8 @@ func TestDequeue(t *testing.T) {
}

func TestDequeueHostOrder(t *testing.T) {
stats.Reset()
stats.Init(nil)
// Create a temporary directory for the queue files
tempDir, err := os.MkdirTemp("", "queue_test")
if err != nil {
Expand Down
Loading

0 comments on commit 4911c86

Please sign in to comment.