Skip to content

Commit

Permalink
handle updates and pds migrations (bluesky-social#106)
Browse files Browse the repository at this point in the history
basic handling for handle updates and pds migrations.
Theres likely more cases to be handled (migrateTo can be nil, what does
that imply?) but this should be an improvement for now.
  • Loading branch information
whyrusleeping authored Apr 12, 2023
2 parents 69574f4 + 713e21f commit 62ac17d
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 7 deletions.
3 changes: 3 additions & 0 deletions api/plc.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func (s *PLCServer) CreateDID(ctx context.Context, sigkey *did.PrivKey, recovery
}

return opdid, nil
}

func (s *PLCServer) UpdateUserHandle(ctx context.Context, did string, handle string) error {
return fmt.Errorf("handle updates not yet implemented")
}

func didForCreateOp(op *CreateOp) (string, error) {
Expand Down
51 changes: 49 additions & 2 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"contrib.go.opencensus.io/exporter/prometheus"
atproto "github.com/bluesky-social/indigo/api/atproto"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/blobs"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/events"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/plc"
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/util"
bsutil "github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -372,6 +374,25 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
}
}

return nil
case env.RepoHandle != nil:

// TODO: ignoring the data in the message and just going out to the DID doc
act, err := bgs.createExternalUser(ctx, env.RepoHandle.Did)
if err != nil {
return err
}

if act.Handle != env.RepoHandle.Handle {
log.Warnw("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle)
}

return nil
case env.RepoMigrate != nil:
if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil {
return err
}

return nil
default:
return fmt.Errorf("invalid fed event")
Expand Down Expand Up @@ -404,6 +425,7 @@ func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user bsutil.Ui
return nil
}

// TODO: rename? This also updates users, and 'external' is an old phrasing
func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "createExternalUser")
defer span.End()
Expand Down Expand Up @@ -438,7 +460,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
c := &xrpc.Client{Host: durl.String()}

if peering.ID == 0 {

// TODO: the case of handling a new user on a new PDS probably requires more thought
cfg, err := atproto.ServerDescribeServer(ctx, c)
if err != nil {
// TODO: failing this shouldnt halt our indexing
Expand Down Expand Up @@ -486,7 +508,32 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor

exu, err := s.Index.LookupUserByDid(ctx, did)
if err == nil {
log.Infof("lost the race to create a new user: %s", did)
log.Infow("lost the race to create a new user", "did", did, "handle", handle)
if exu.PDS != peering.ID {
// User is now on a different PDS, update
if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("pds", peering.ID).Error; err != nil {
return nil, fmt.Errorf("failed to update users pds: %w", err)
}

}

if exu.Handle != handle {
// Users handle has changed, update
if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("handle", peering.ID).Error; err != nil {
return nil, fmt.Errorf("failed to update users handle: %w", err)
}

if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
Did: exu.Did,
Handle: handle,
Time: time.Now().Format(util.ISO8601),
},
}); err != nil {
// TODO: should we really error here? I'm leaning towards no
return nil, fmt.Errorf("failed to push handle update event: %s", err)
}
}
return exu, nil
}

Expand Down
45 changes: 45 additions & 0 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,51 @@ func (s *Slurper) handleConnection(host *models.PDS, con *websocket.Conn, lastCu

return nil
},
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
log.Infow("got remote handle update event", "host", host.Host, "did", evt.Did, "handle", evt.Handle)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoHandle: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
}
*lastCursor = evt.Seq

if err := s.updateCursor(host, *lastCursor); err != nil {
return fmt.Errorf("updating cursor: %w", err)
}

return nil
},
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
log.Infow("got remote repo migrate event", "host", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoMigrate: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
}
*lastCursor = evt.Seq

if err := s.updateCursor(host, *lastCursor); err != nil {
return fmt.Errorf("updating cursor: %w", err)
}

return nil
},
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
log.Infow("got remote repo tombstone event", "host", host.Host, "did", evt.Did)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoTombstone: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
}
*lastCursor = evt.Seq

if err := s.updateCursor(host, *lastCursor); err != nil {
return fmt.Errorf("updating cursor: %w", err)
}

return nil
},
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
log.Infow("info event", "name", info.Name, "message", info.Message, "host", host.Host)
return nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/version"

_ "github.com/joho/godotenv/autoload"
_ "net/http/pprof"

_ "github.com/joho/godotenv/autoload"

logging "github.com/ipfs/go-log"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
Expand Down
11 changes: 11 additions & 0 deletions events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, cbs *RepoStreamC
} else {
log.Warnf("received repo commit event with nil commit object (seq %d)", evt.Seq)
}
case "#handle":
var evt comatproto.SyncSubscribeRepos_Handle
if err := evt.UnmarshalCBOR(r); err != nil {
return err
}

if cbs.RepoHandle != nil {
if err := cbs.RepoHandle(&evt); err != nil {
return err
}
}
case "#info":
// TODO: this might also be a LabelInfo (as opposed to RepoInfo)
var evt comatproto.SyncSubscribeRepos_Info
Expand Down
9 changes: 7 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEven

return nil
}

func (ix *Indexer) handleRecordDeleteFeedLike(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
var vr models.VoteRecord
if err := ix.db.Find(&vr, "voter = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil {
Expand Down Expand Up @@ -705,12 +706,16 @@ func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.Act
return &ai, nil
}

func (ix *Indexer) lookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) {
func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) {
var ai models.ActorInfo
if err := ix.db.First(&ai, "handle = ?", handle).Error; err != nil {
if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil {
return nil, err
}

if ai.ID == 0 {
return nil, gorm.ErrRecordNotFound
}

return &ai, nil
}

Expand Down
2 changes: 1 addition & 1 deletion models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type RepostRecord struct {
type ActorInfo struct {
gorm.Model
Uid util.Uid `gorm:"uniqueindex"`
Handle string
Handle string `gorm:"uniqueindex"`
DisplayName string
Did string `gorm:"uniqueindex"`
Following int64
Expand Down
11 changes: 10 additions & 1 deletion pds/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,16 @@ func (s *Server) handleAppBskyUnspeccedGetPopular(ctx context.Context, cursor st
}

func (s *Server) handleComAtprotoIdentityUpdateHandle(ctx context.Context, body *comatprototypes.IdentityUpdateHandle_Input) error {
panic("nyi")
if err := s.validateHandle(body.Handle); err != nil {
return err
}

u, err := s.getUser(ctx)
if err != nil {
return err
}

return s.UpdateUserHandle(ctx, u, body.Handle)
}

func (s *Server) handleComAtprotoModerationCreateReport(ctx context.Context, body *comatprototypes.ModerationCreateReport_Input) (*comatprototypes.ModerationCreateReport_Output, error) {
Expand Down
48 changes: 48 additions & 0 deletions pds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/bluesky-social/indigo/notifs"
"github.com/bluesky-social/indigo/plc"
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/util"
bsutil "github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
gojwt "github.com/golang-jwt/jwt"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/labstack/echo/v4/middleware"
"github.com/lestrrat-go/jwx/v2/jwt"
"github.com/whyrusleeping/go-did"
"golang.org/x/xerrors"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -312,6 +314,15 @@ func (s *Server) RunAPI(listen string) error {

e.HTTPErrorHandler = func(err error, ctx echo.Context) {
fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err)

// TODO: need to properly figure out where http error codes for error
// types get decided. This spot is reasonable, but maybe a bit weird.
// reviewers, please advise
if xerrors.Is(err, ErrNoSuchUser) {
ctx.Response().WriteHeader(404)
return
}

ctx.Response().WriteHeader(500)
}

Expand Down Expand Up @@ -629,3 +640,40 @@ func (s *Server) EventsHandler(c echo.Context) error {

return nil
}

func (s *Server) UpdateUserHandle(ctx context.Context, u *User, handle string) error {
if u.Handle == handle {
// no change? move on
log.Warnw("attempted to change handle to current handle", "did", u.Did, "handle", handle)
return nil
}

_, err := s.indexer.LookupUserByHandle(ctx, handle)
if err == nil {
return fmt.Errorf("handle %q is already in use", handle)
}

if err := s.plc.UpdateUserHandle(ctx, u.Did, handle); err != nil {
return fmt.Errorf("failed to update users handle on plc: %w", err)
}

if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil {
return fmt.Errorf("failed to update handle: %w", err)
}

if err := s.db.Model(User{}).Where("id = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil {
return fmt.Errorf("failed to update handle: %w", err)
}

if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
Did: u.Did,
Handle: handle,
Time: time.Now().Format(util.ISO8601),
},
}); err != nil {
return fmt.Errorf("failed to push event: %s", err)
}

return nil
}
8 changes: 8 additions & 0 deletions plc/fakedid.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,11 @@ func (fd *FakeDid) CreateDID(ctx context.Context, sigkey *did.PrivKey, recovery

return d, nil
}

func (fd *FakeDid) UpdateUserHandle(ctx context.Context, did string, nhandle string) error {
if err := fd.db.Model(FakeDidMapping{}).Where("did = ?", did).UpdateColumn("handle", nhandle).Error; err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions plc/plc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type PLCClient interface {
DidResolver
CreateDID(ctx context.Context, sigkey *did.PrivKey, recovery string, handle string, service string) (string, error)
UpdateUserHandle(ctx context.Context, didstr string, nhandle string) error
}

type DidResolver interface {
Expand Down
30 changes: 30 additions & 0 deletions testing/integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,33 @@ func TestBGSMultiGap(t *testing.T) {
t.Fatal(err)
}
}

func TestHandleChange(t *testing.T) {
//t.Skip("test too sleepy to run in CI for now")
assert := assert.New(t)
_ = assert
didr := testPLC(t)
p1 := mustSetupPDS(t, "localhost:5385", ".pdsuno", didr)
p1.Run(t)

b1 := mustSetupBGS(t, "localhost:8391", didr)
b1.Run(t)

p1.RequestScraping(t, b1)
time.Sleep(time.Millisecond * 50)

evts := b1.Events(t, -1)

u := p1.MustNewUser(t, usernames[0]+".pdsuno")

//socialSim(t, []*testUser{u}, 10, 0)

u.ChangeHandle(t, "catbear.pdsuno")

time.Sleep(time.Millisecond * 100)

initevt := evts.Next()
fmt.Println(initevt.RepoCommit)
hcevt := evts.Next()
fmt.Println(hcevt.RepoHandle)
}
18 changes: 18 additions & 0 deletions testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ func (u *testUser) GetNotifs(t *testing.T) []*bsky.NotificationListNotifications
return resp.Notifications
}

func (u *testUser) ChangeHandle(t *testing.T, nhandle string) {
t.Helper()

ctx := context.TODO()
if err := atproto.IdentityUpdateHandle(ctx, u.client, &atproto.IdentityUpdateHandle_Input{
Handle: nhandle,
}); err != nil {
t.Fatal(err)
}
}

func testPLC(t *testing.T) *plc.FakeDid {
// TODO: just do in memory...
tdir, err := os.MkdirTemp("", "plcserv")
Expand Down Expand Up @@ -447,6 +458,13 @@ func (b *testBGS) Events(t *testing.T, since int64) *eventStream {
es.lk.Unlock()
return nil
},
RepoHandle: func(evt *atproto.SyncSubscribeRepos_Handle) error {
fmt.Println("received handle event: ", evt.Seq, evt.Did)
es.lk.Lock()
es.events = append(es.events, &events.XRPCStreamEvent{RepoHandle: evt})
es.lk.Unlock()
return nil
},
}); err != nil {
fmt.Println(err)
}
Expand Down

0 comments on commit 62ac17d

Please sign in to comment.