Skip to content

Commit

Permalink
Remove function pointers from Config struct
Browse files Browse the repository at this point in the history
  • Loading branch information
softwareplumber committed Mar 21, 2022
1 parent 3906546 commit f03eef4
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 53 deletions.
25 changes: 25 additions & 0 deletions cmd/estuary-shuttle/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"context"

"github.com/application-research/estuary/config"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type Initializer struct {
cfg *config.Config
}

func (init Initializer) Config() *config.Config {
return init.cfg
}

func (init Initializer) BlockstoreWrap(blk blockstore.Blockstore) (blockstore.Blockstore, error) {
return blk, nil
}

func (init Initializer) KeyProviderFunc(context.Context) (<-chan cid.Cid, error) {
return nil, nil
}
4 changes: 3 additions & 1 deletion cmd/estuary-shuttle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func main() {
},
}

nd, err := node.Setup(context.TODO(), cfg)
init := Initializer{cfg}

nd, err := node.Setup(context.TODO(), init)
if err != nil {
return err
}
Expand Down
11 changes: 0 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package config

import (
"context"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type Config struct {
ListenAddrs []string
AnnounceAddrs []string
Expand All @@ -25,10 +18,6 @@ type Config struct {
WalletDir string

BitswapConfig BitswapConfig

BlockstoreWrap func(blockstore.Blockstore) (blockstore.Blockstore, error)

KeyProviderFunc func(context.Context) (<-chan cid.Cid, error)
}

type BitswapConfig struct {
Expand Down
49 changes: 49 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"context"

"github.com/application-research/estuary/config"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"gorm.io/gorm"
)

type Initializer struct {
cfg *config.Config
db *gorm.DB
trackingBstore *TrackingBlockstore
}

func (init Initializer) Config() *config.Config {
return init.cfg
}

func (init Initializer) BlockstoreWrap(bs blockstore.Blockstore) (blockstore.Blockstore, error) {
init.trackingBstore = NewTrackingBlockstore(bs, init.db)
return init.trackingBstore, nil
}

func (init Initializer) KeyProviderFunc(rpctx context.Context) (<-chan cid.Cid, error) {
log.Infof("running key provider func")
out := make(chan cid.Cid)
go func() {
defer close(out)

var contents []Content
if err := init.db.Find(&contents, "active").Error; err != nil {
log.Errorf("failed to load contents for reproviding: %s", err)
return
}
log.Infof("key provider func returning %d values", len(contents))

for _, c := range contents {
select {
case out <- c.Cid.CID:
case <-rpctx.Done():
return
}
}
}()
return out, nil
}
37 changes: 4 additions & 33 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
gsimpl "github.com/ipfs/go-graphsync/impl"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/whyrusleeping/memo"
Expand Down Expand Up @@ -306,37 +305,9 @@ func main() {

defaultReplication = cctx.Int("default-replication")

cfg.KeyProviderFunc = func(rpctx context.Context) (<-chan cid.Cid, error) {
log.Infof("running key provider func")
out := make(chan cid.Cid)
go func() {
defer close(out)

var contents []Content
if err := db.Find(&contents, "active").Error; err != nil {
log.Errorf("failed to load contents for reproviding: %s", err)
return
}
log.Infof("key provider func returning %d values", len(contents))

for _, c := range contents {
select {
case out <- c.Cid.CID:
case <-rpctx.Done():
return
}
}
}()
return out, nil
}

var trackingBstore *TrackingBlockstore
cfg.BlockstoreWrap = func(bs blockstore.Blockstore) (blockstore.Blockstore, error) {
trackingBstore = NewTrackingBlockstore(bs, db)
return trackingBstore, nil
}
init := Initializer{cfg, db, nil}

nd, err := node.Setup(context.Background(), cfg)
nd, err := node.Setup(context.Background(), init)
if err != nil {
return err
}
Expand Down Expand Up @@ -434,7 +405,7 @@ func main() {

s.DB = db

cm, err := NewContentManager(db, api, fc, trackingBstore, s.Node.NotifBlockstore, nd.Provider, pinmgr, nd, cctx.String("hostname"))
cm, err := NewContentManager(db, api, fc, init.trackingBstore, s.Node.NotifBlockstore, nd.Provider, pinmgr, nd, cctx.String("hostname"))
if err != nil {
return err
}
Expand All @@ -450,7 +421,7 @@ func main() {
cm.tracer = otel.Tracer("replicator")

if cctx.Bool("enable-auto-retrive") {
trackingBstore.SetCidReqFunc(cm.RefreshContentForCid)
init.trackingBstore.SetCidReqFunc(cm.RefreshContentForCid)
}

if !cctx.Bool("no-storage-cron") {
Expand Down
22 changes: 14 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ type EstuaryBlockstore interface {
DeleteMany(context.Context, []cid.Cid) error
}

type NodeInitializer interface {
BlockstoreWrap(blockstore.Blockstore) (blockstore.Blockstore, error)
KeyProviderFunc(context.Context) (<-chan cid.Cid, error)
Config() *config.Config
}

type Node struct {
Dht *dht.IpfsDHT
Provider *batched.BatchProvidingSystem
Expand All @@ -106,7 +112,9 @@ type Node struct {
Config *config.Config
}

func Setup(ctx context.Context, cfg *config.Config) (*Node, error) {
func Setup(ctx context.Context, init NodeInitializer) (*Node, error) {

cfg := init.Config()

peerkey, err := loadOrInitPeerKey(cfg.Libp2pKeyFile)
if err != nil {
Expand Down Expand Up @@ -202,13 +210,11 @@ func Setup(ctx context.Context, cfg *config.Config) (*Node, error) {
}

var blkst blockstore.Blockstore = mbs
if cfg.BlockstoreWrap != nil {
wrapper, err := cfg.BlockstoreWrap(blkst)
if err != nil {
return nil, err
}
blkst = wrapper
wrapper, err := init.BlockstoreWrap(blkst)
if err != nil {
return nil, err
}
blkst = wrapper

bsnet := bsnet.NewFromIpfsHost(h, frt)

Expand Down Expand Up @@ -241,7 +247,7 @@ func Setup(ctx context.Context, cfg *config.Config) (*Node, error) {
}

prov, err := batched.New(frt, provq,
batched.KeyProvider(cfg.KeyProviderFunc),
batched.KeyProvider(init.KeyProviderFunc),
batched.Datastore(ds),
)
if err != nil {
Expand Down

0 comments on commit f03eef4

Please sign in to comment.