Skip to content

Commit

Permalink
updated core structure
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Aug 23, 2024
1 parent dad2e31 commit 9daff28
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
14 changes: 10 additions & 4 deletions ant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
kad "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/probe-lab/go-libdht/kad/key/bit256"
)
Expand All @@ -17,7 +18,7 @@ const (
)

type Ant struct {
host host.Host
Host host.Host
dht *kad.IpfsDHT

closeChan chan struct{}
Expand All @@ -26,6 +27,9 @@ type Ant struct {
}

func SpawnAnt(ctx context.Context, privKey crypto.PrivKey) (*Ant, error) {

pid, _ := peer.IDFromPrivateKey(privKey)
logger.Debugf("spawning ant. kadid: %s, peerid: %s", PeeridToKadid(pid).HexString(), pid)
// TODO: edit libp2p host for cloud deployment
h, err := libp2p.New(
libp2p.UserAgent("celestia-celestia"),
Expand All @@ -34,6 +38,7 @@ func SpawnAnt(ctx context.Context, privKey crypto.PrivKey) (*Ant, error) {
libp2p.DisableRelay(),
)
if err != nil {
logger.Warn("unable to create libp2p host: ", err)
return nil, err
}

Expand All @@ -44,17 +49,18 @@ func SpawnAnt(ctx context.Context, privKey crypto.PrivKey) (*Ant, error) {
}
dht, err := kad.New(ctx, h, dhtOpts...)
if err != nil {
logger.Warn("unable to create libp2p dht: ", err)
return nil, err
}

ant := &Ant{
host: h,
Host: h,
dht: dht,
closeChan: make(chan struct{}, 1),
KadId: PeeridToKadid(h.ID()),
}

ant.run(ctx)
go ant.run(ctx)

return ant, nil
}
Expand All @@ -80,5 +86,5 @@ func (a *Ant) Close() error {
if err != nil {
return err
}
return a.host.Close()
return a.Host.Close()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ require (
go.uber.org/fx v1.22.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
Expand Down
20 changes: 16 additions & 4 deletions db.go → nebuladb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@ type NebulaDB struct {
connPool *pgxpool.Pool
}

func NewDB(connString string) *NebulaDB {
func NewNebulaDB(connString string) *NebulaDB {
return &NebulaDB{
ConnString: connString,
}
}

func (db *NebulaDB) Open(ctx context.Context) error {
if db.connPool != nil {
return nil
}
connPool, err := pgxpool.New(ctx, db.ConnString)
if err == nil {
db.connPool = connPool
if err != nil {
logger.Warn("unable to open connection to Nebula DB: ", err)
return err
}
return err
logger.Debug("opened connection to Nebula DB")
db.connPool = connPool
return nil
}

func (db *NebulaDB) Close() {
Expand All @@ -43,6 +49,8 @@ func (db *NebulaDB) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
defer db.Close()
}

logger.Debug("getting last crawl from Nebula DB")

crawlIdQuery := `
SELECT c.id
FROM crawls c
Expand All @@ -55,6 +63,7 @@ func (db *NebulaDB) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
var crawlId uint64
err := db.connPool.QueryRow(ctx, crawlIdQuery, crawlIntervalAgo).Scan(&crawlId)
if err != nil {
logger.Warn("unable to get last crawl from Nebula DB: ", err)
return nil, err
}

Expand All @@ -70,6 +79,7 @@ func (db *NebulaDB) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
beforeLastCrawlStarted := crawlIntervalAgo.Add(-CRAWL_INTERVAL)
rows, err := db.connPool.Query(ctx, peersQuery, beforeLastCrawlStarted, crawlId)
if err != nil {
logger.Warn("unable to get peers from Nebula DB: ", err)
return nil, err
}

Expand All @@ -87,5 +97,7 @@ func (db *NebulaDB) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
peerIds = append(peerIds, peerId)
}

logger.Debugf("found %d peers during the last Nebula crawl", len(peerIds))

return peerIds, nil
}
21 changes: 21 additions & 0 deletions queen.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,22 @@ type Queen struct {
ants []*Ant
}

func NewQueen(dbConnString string, keysDbPath string) *Queen {
nebulaDB := NewNebulaDB(dbConnString)
keysDB := NewKeysDB(keysDbPath)

logger.Debug("queen created")

return &Queen{
nebulaDB: nebulaDB,
keysDB: keysDB,
ants: []*Ant{},
}
}

func (q *Queen) Run(ctx context.Context) {
t := time.NewTicker(CRAWL_INTERVAL)
q.routine(ctx)

for {
select {
Expand All @@ -52,6 +66,7 @@ func (q *Queen) routine(ctx context.Context) {

// zones correspond to the prefixes of the tries that must be covered by an ant
zones := trieZones(networkTrie, BUCKET_SIZE)
logger.Debugf("%d zones must be covered by ants", len(zones))

// convert string zone to bitstr.Key
missingKeys := make([]bitstr.Key, len(zones))
Expand All @@ -77,6 +92,10 @@ func (q *Queen) routine(ctx context.Context) {
excessAntsIndices = append(excessAntsIndices, index)
}
}
logger.Debugf("currently have %d ants", len(q.ants))
logger.Debugf("need %d extra ants", len(missingKeys))
logger.Debugf("removing %d ants", len(excessAntsIndices))

// remove ants
for _, index := range excessAntsIndices {
q.ants[index].Close()
Expand All @@ -92,6 +111,8 @@ func (q *Queen) routine(ctx context.Context) {
}
q.ants = append(q.ants, ant)
}

logger.Debug("queen routine over")
}

func trieZones[K kad.Key[K], T any](t *trie.Trie[K, T], zoneSize int) []string {
Expand Down

0 comments on commit 9daff28

Please sign in to comment.