diff --git a/cmd/estuary-shuttle/init.go b/cmd/estuary-shuttle/init.go new file mode 100644 index 00000000..e98b193f --- /dev/null +++ b/cmd/estuary-shuttle/init.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + + "github.com/application-research/estuary/config" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type Initializer struct { + cfg *config.Config +} + +func (init Initializer) Config() *config.Config { + return init.cfg +} + +func (init Initializer) BlockstoreWrap(blk blockstore.Blockstore) (blockstore.Blockstore, error) { + return blk, nil +} + +func (init Initializer) KeyProviderFunc(context.Context) (<-chan cid.Cid, error) { + return nil, nil +} diff --git a/cmd/estuary-shuttle/main.go b/cmd/estuary-shuttle/main.go index 3b2373d1..afb61d17 100644 --- a/cmd/estuary-shuttle/main.go +++ b/cmd/estuary-shuttle/main.go @@ -237,7 +237,9 @@ func main() { }, } - nd, err := node.Setup(context.TODO(), cfg) + init := Initializer{cfg} + + nd, err := node.Setup(context.TODO(), init) if err != nil { return err } diff --git a/config/config.go b/config/config.go index 502ad732..94574fb7 100644 --- a/config/config.go +++ b/config/config.go @@ -1,12 +1,5 @@ package config -import ( - "context" - - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" -) - type Config struct { ListenAddrs []string AnnounceAddrs []string @@ -25,10 +18,6 @@ type Config struct { WalletDir string BitswapConfig BitswapConfig - - BlockstoreWrap func(blockstore.Blockstore) (blockstore.Blockstore, error) - - KeyProviderFunc func(context.Context) (<-chan cid.Cid, error) } type BitswapConfig struct { diff --git a/init.go b/init.go new file mode 100644 index 00000000..1bd720a5 --- /dev/null +++ b/init.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + + "github.com/application-research/estuary/config" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "gorm.io/gorm" +) + +type Initializer struct { + cfg *config.Config + db *gorm.DB + trackingBstore *TrackingBlockstore +} + +func (init Initializer) Config() *config.Config { + return init.cfg +} + +func (init Initializer) BlockstoreWrap(bs blockstore.Blockstore) (blockstore.Blockstore, error) { + init.trackingBstore = NewTrackingBlockstore(bs, init.db) + return init.trackingBstore, nil +} + +func (init Initializer) KeyProviderFunc(rpctx context.Context) (<-chan cid.Cid, error) { + log.Infof("running key provider func") + out := make(chan cid.Cid) + go func() { + defer close(out) + + var contents []Content + if err := init.db.Find(&contents, "active").Error; err != nil { + log.Errorf("failed to load contents for reproviding: %s", err) + return + } + log.Infof("key provider func returning %d values", len(contents)) + + for _, c := range contents { + select { + case out <- c.Cid.CID: + case <-rpctx.Done(): + return + } + } + }() + return out, nil +} diff --git a/main.go b/main.go index 4e1a9804..dfc0b4f3 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,6 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" gsimpl "github.com/ipfs/go-graphsync/impl" - blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" routed "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/whyrusleeping/memo" @@ -306,37 +305,9 @@ func main() { defaultReplication = cctx.Int("default-replication") - cfg.KeyProviderFunc = func(rpctx context.Context) (<-chan cid.Cid, error) { - log.Infof("running key provider func") - out := make(chan cid.Cid) - go func() { - defer close(out) - - var contents []Content - if err := db.Find(&contents, "active").Error; err != nil { - log.Errorf("failed to load contents for reproviding: %s", err) - return - } - log.Infof("key provider func returning %d values", len(contents)) - - for _, c := range contents { - select { - case out <- c.Cid.CID: - case <-rpctx.Done(): - return - } - } - }() - return out, nil - } - - var trackingBstore *TrackingBlockstore - cfg.BlockstoreWrap = func(bs blockstore.Blockstore) (blockstore.Blockstore, error) { - trackingBstore = NewTrackingBlockstore(bs, db) - return trackingBstore, nil - } + init := Initializer{cfg, db, nil} - nd, err := node.Setup(context.Background(), cfg) + nd, err := node.Setup(context.Background(), init) if err != nil { return err } @@ -434,7 +405,7 @@ func main() { s.DB = db - cm, err := NewContentManager(db, api, fc, trackingBstore, s.Node.NotifBlockstore, nd.Provider, pinmgr, nd, cctx.String("hostname")) + cm, err := NewContentManager(db, api, fc, init.trackingBstore, s.Node.NotifBlockstore, nd.Provider, pinmgr, nd, cctx.String("hostname")) if err != nil { return err } @@ -450,7 +421,7 @@ func main() { cm.tracer = otel.Tracer("replicator") if cctx.Bool("enable-auto-retrive") { - trackingBstore.SetCidReqFunc(cm.RefreshContentForCid) + init.trackingBstore.SetCidReqFunc(cm.RefreshContentForCid) } if !cctx.Bool("no-storage-cron") { diff --git a/node/node.go b/node/node.go index 9d51531d..51d78b89 100644 --- a/node/node.go +++ b/node/node.go @@ -82,6 +82,12 @@ type EstuaryBlockstore interface { DeleteMany(context.Context, []cid.Cid) error } +type NodeInitializer interface { + BlockstoreWrap(blockstore.Blockstore) (blockstore.Blockstore, error) + KeyProviderFunc(context.Context) (<-chan cid.Cid, error) + Config() *config.Config +} + type Node struct { Dht *dht.IpfsDHT Provider *batched.BatchProvidingSystem @@ -106,7 +112,9 @@ type Node struct { Config *config.Config } -func Setup(ctx context.Context, cfg *config.Config) (*Node, error) { +func Setup(ctx context.Context, init NodeInitializer) (*Node, error) { + + cfg := init.Config() peerkey, err := loadOrInitPeerKey(cfg.Libp2pKeyFile) if err != nil { @@ -202,13 +210,11 @@ func Setup(ctx context.Context, cfg *config.Config) (*Node, error) { } var blkst blockstore.Blockstore = mbs - if cfg.BlockstoreWrap != nil { - wrapper, err := cfg.BlockstoreWrap(blkst) - if err != nil { - return nil, err - } - blkst = wrapper + wrapper, err := init.BlockstoreWrap(blkst) + if err != nil { + return nil, err } + blkst = wrapper bsnet := bsnet.NewFromIpfsHost(h, frt) @@ -241,7 +247,7 @@ func Setup(ctx context.Context, cfg *config.Config) (*Node, error) { } prov, err := batched.New(frt, provq, - batched.KeyProvider(cfg.KeyProviderFunc), + batched.KeyProvider(init.KeyProviderFunc), batched.Datastore(ds), ) if err != nil {