Skip to content

Commit

Permalink
Improve handling of bad records (bluesky-social#124)
Browse files Browse the repository at this point in the history
And some other misc improvements.
  • Loading branch information
whyrusleeping authored Apr 18, 2023
2 parents 073d04c + c467fce commit b96408d
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 59 deletions.
2 changes: 1 addition & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
// TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event

if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks); err != nil {
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq)
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", evt.Prev.String(), "commit", evt.Commit.String())
if !errors.Is(err, carstore.ErrRepoBaseMismatch) {
return fmt.Errorf("handle user event failed: %w", err)
}
Expand Down
95 changes: 95 additions & 0 deletions cmd/gosky/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var debugCmd = &cli.Command{
Description: "a set of debugging utilities for atproto",
Subcommands: []*cli.Command{
inspectEventCmd,
debugStreamCmd,
},
}

Expand Down Expand Up @@ -136,3 +137,97 @@ var inspectEventCmd = &cli.Command{
return nil
},
}

type eventInfo struct {
LastCid cid.Cid
LastSeq int64
}

var debugStreamCmd = &cli.Command{
Name: "debug-stream",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "host",
Required: true,
},
&cli.BoolFlag{
Name: "dump-raw-blocks",
},
},
Action: func(cctx *cli.Context) error {
n, err := strconv.Atoi(cctx.Args().First())
if err != nil {
return err
}

h := cctx.String("host")

url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1)
d := websocket.DefaultDialer
con, _, err := d.Dial(url, http.Header{})
if err != nil {
return fmt.Errorf("dial failure: %w", err)
}

infos := make(map[string]*eventInfo)

ctx := context.TODO()
err = events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {

fmt.Printf("\rChecking seq: %d ", evt.Seq)

r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err)
return nil
} else {
prev, err := r.PrevCommit(ctx)
if err != nil {
return err
}

var cs, es string
if prev != nil {
cs = prev.String()
}

if evt.Prev != nil {
es = evt.Prev.String()
}

if cs != es {
fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
}
}

cur, ok := infos[evt.Repo]
if ok {
if cur.LastCid.String() != evt.Prev.String() {
fmt.Println()
fmt.Printf("Event at sequence %d, repo=%s had prev=%s head=%s, but last commit we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Prev.String(), evt.Commit.String(), evt.Commit.String(), cur.LastSeq)
}
}

infos[evt.Repo] = &eventInfo{
LastCid: cid.Cid(evt.Commit),
LastSeq: evt.Seq,
}

return nil
},
RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
return nil
},
// TODO: all the other Repo* event types
Error: func(evt *events.ErrorFrame) error {
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
},
})
if err != nil {
return err
}

return nil
},
}
2 changes: 1 addition & 1 deletion cmd/gosky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ var readRepoStreamCmd = &cli.Command{
if evt.Prev != nil && evt.Prev.Defined() {
pstr = evt.Prev.String()
}
fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit)
fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit.String())
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion events/dbpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type RepoEventRecord struct {

type RepoOpRecord struct {
ID uint `gorm:"primarykey"`
RepoEventRecordID uint
RepoEventRecordID uint `gorm:"index"`
Path string
Action string
Rec *util.DbCID
Expand Down
9 changes: 6 additions & 3 deletions events/repostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ func ConsumeRepoStreamLite(ctx context.Context, con *websocket.Conn, cb LiteStre
if err != nil {
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
log.Error(e)
return nil
continue
}

if lexutil.LexLink(rc) != *op.Cid {
// TODO: do we even error here?
return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
}

if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil {
return err
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}

case repomgr.EvtKindDeleteRecord:
if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil {
return err
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"fmt"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
Expand All @@ -20,22 +21,29 @@ type CrawlDispatcher struct {
complete chan util.Uid

doRepoCrawl func(context.Context, *crawlWork) error

concurrency int
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error) *CrawlDispatcher {
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

return &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan util.Uid),
catchup: make(chan *catchupJob),
doRepoCrawl: repoFn,
}
concurrency: concurrency,
}, nil
}

func (c *CrawlDispatcher) Run() {
go c.mainLoop()

for i := 0; i < 3; i++ {
for i := 0; i < c.concurrency; i++ {
go c.fetchWorker()
}
}
Expand Down
75 changes: 43 additions & 32 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
ix.Crawler = NewCrawlDispatcher(ix.FetchAndIndexRepo)
c, err := NewCrawlDispatcher(ix.FetchAndIndexRepo, 10)
if err != nil {
return nil, err
}

ix.Crawler = c
ix.Crawler.Run()
}

Expand All @@ -91,36 +95,8 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent)
Cid: link,
})

switch op.Kind {
case repomgr.EvtKindCreateRecord:
if err := ix.crawlRecordReferences(ctx, &op); err != nil {
return err
}

if ix.doAggregations {
_, err := ix.handleRecordCreate(ctx, evt, &op, true)
if err != nil {
return fmt.Errorf("handle recordCreate: %w", err)
}
}
case repomgr.EvtKindInitActor:
if err := ix.handleInitActor(ctx, evt, &op); err != nil {
return fmt.Errorf("handle initActor: %w", err)
}
case repomgr.EvtKindDeleteRecord:
if ix.doAggregations {
if err := ix.handleRecordDelete(ctx, evt, &op, true); err != nil {
return fmt.Errorf("handle recordDelete: %w", err)
}
}
case repomgr.EvtKindUpdateRecord:
if ix.doAggregations {
if err := ix.handleRecordUpdate(ctx, evt, &op, true); err != nil {
return fmt.Errorf("handle recordCreate: %w", err)
}
}
default:
return fmt.Errorf("unrecognized repo event type: %q", op.Kind)
if err := ix.handleRepoOp(ctx, evt, &op); err != nil {
log.Errorw("failed to handle repo op", "err", err)
}
}

Expand Down Expand Up @@ -156,6 +132,42 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent)
return nil
}

func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
switch op.Kind {
case repomgr.EvtKindCreateRecord:
if err := ix.crawlRecordReferences(ctx, op); err != nil {
return err
}

if ix.doAggregations {
_, err := ix.handleRecordCreate(ctx, evt, op, true)
if err != nil {
return fmt.Errorf("handle recordCreate: %w", err)
}
}
case repomgr.EvtKindInitActor:
if err := ix.handleInitActor(ctx, evt, op); err != nil {
return fmt.Errorf("handle initActor: %w", err)
}
case repomgr.EvtKindDeleteRecord:
if ix.doAggregations {
if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil {
return fmt.Errorf("handle recordDelete: %w", err)
}
}
case repomgr.EvtKindUpdateRecord:
if ix.doAggregations {
if err := ix.handleRecordUpdate(ctx, evt, op, true); err != nil {
return fmt.Errorf("handle recordCreate: %w", err)
}
}
default:
return fmt.Errorf("unrecognized repo event type: %q", op.Kind)
}

return nil
}

func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error {
log.Infow("record delete event", "collection", op.Collection)

Expand Down Expand Up @@ -852,7 +864,6 @@ func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
from = curHead.String()
} else {
span.SetAttributes(attribute.Bool("full", true))

}

// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
Expand Down
32 changes: 16 additions & 16 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
defer span.End()

log.Infof("HandleExternalUserEvent: %d %d %s", pdsid, uid, prev)
log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "prev", prev)

unlock := rm.lockUser(ctx, uid)
defer unlock()
Expand Down Expand Up @@ -543,8 +543,6 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
return fmt.Errorf("signature check failed: %w", err)
}

log.Infow("external event", "uid", uid)

var pcid cid.Cid
if prev != nil {
pcid = *prev
Expand Down Expand Up @@ -885,28 +883,30 @@ func processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp) (*
return nil, err
}

rec, err := lexutil.CborDecodeValue(blk.RawData())
if err != nil {
if errors.Is(err, lexutil.ErrUnrecognizedType) {
log.Warnf("failed processing repo diff: %s", err)
return nil, nil
}

return nil, err
}

kind := EvtKindCreateRecord
if op.Op == "mut" {
kind = EvtKindUpdateRecord
}

return &RepoOp{
outop := &RepoOp{
Kind: kind,
Collection: parts[0],
Rkey: parts[1],
RecCid: &op.NewCid,
Record: rec,
}, nil
}

rec, err := lexutil.CborDecodeValue(blk.RawData())
if err != nil {
if !errors.Is(err, lexutil.ErrUnrecognizedType) {
return nil, err
}

log.Warnf("failed processing repo diff: %s", err)
} else {
outop.Record = rec
}

return outop, nil
case "del":
return &RepoOp{
Kind: EvtKindDeleteRecord,
Expand Down
4 changes: 2 additions & 2 deletions xrpc/xrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func (c *Client) Do(ctx context.Context, kind XRPCRequestType, inpenc string, me
return fmt.Errorf("reading response body: %w", err)
}
} else {
_, err := io.CopyN(buf, resp.Body, resp.ContentLength)
n, err := io.CopyN(buf, resp.Body, resp.ContentLength)
if err != nil {
return fmt.Errorf("reading length delimited response body: %w", err)
return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err)
}
}
} else {
Expand Down

0 comments on commit b96408d

Please sign in to comment.