Skip to content

Commit

Permalink
crawl faster, error out less
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Apr 13, 2023
1 parent fe570b3 commit 2b3b47c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
9 changes: 6 additions & 3 deletions events/repostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ func ConsumeRepoStreamLite(ctx context.Context, con *websocket.Conn, cb LiteStre
if err != nil {
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
log.Error(e)
return nil
continue
}

if lexutil.LexLink(rc) != *op.Cid {
// TODO: do we even error here?
return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
}

if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil {
return err
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}

case repomgr.EvtKindDeleteRecord:
if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil {
return err
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"fmt"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
Expand All @@ -20,22 +21,29 @@ type CrawlDispatcher struct {
complete chan util.Uid

doRepoCrawl func(context.Context, *crawlWork) error

concurrency int
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error) *CrawlDispatcher {
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

return &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan util.Uid),
catchup: make(chan *catchupJob),
doRepoCrawl: repoFn,
}
concurrency: concurrency,
}, nil
}

func (c *CrawlDispatcher) Run() {
go c.mainLoop()

for i := 0; i < 3; i++ {
for i := 0; i < c.concurrency; i++ {
go c.fetchWorker()
}
}
Expand Down
6 changes: 5 additions & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
ix.Crawler = NewCrawlDispatcher(ix.FetchAndIndexRepo)
c, err := NewCrawlDispatcher(ix.FetchAndIndexRepo, 10)
if err != nil {
return nil, err
}

ix.Crawler = c
ix.Crawler.Run()
}

Expand Down

0 comments on commit 2b3b47c

Please sign in to comment.