Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upstream pruning fixes to v1.2.0 #1204

Merged
merged 46 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0db0077
separate pruning services
srene Oct 18, 2024
2601c54
prune update
srene Oct 18, 2024
ab4d211
test fix
srene Oct 18, 2024
01d7423
test fix
srene Oct 18, 2024
427eadc
fixing tests
srene Oct 18, 2024
59b929c
minor edit
srene Oct 18, 2024
d008de9
lint fix
srene Oct 18, 2024
05a7a96
refactor base height store
srene Oct 22, 2024
731112d
fixes
srene Oct 22, 2024
0fe8df5
drying
srene Oct 22, 2024
08a2330
logging
srene Oct 22, 2024
4d2bf08
mocks
srene Oct 22, 2024
17968af
lint
srene Oct 22, 2024
09892bd
fix
srene Oct 22, 2024
12e10ec
minor edit
srene Oct 23, 2024
f6486b6
fix
srene Oct 26, 2024
26a205c
cleaning
srene Oct 26, 2024
7a47e11
logging
srene Oct 26, 2024
669159b
fixing merge
srene Oct 26, 2024
84b712f
fix
srene Oct 26, 2024
7248745
log
srene Oct 26, 2024
b8dbbde
check before deleting
srene Oct 27, 2024
c285d84
lint
srene Oct 27, 2024
6a4749c
test
srene Oct 27, 2024
ad61fc0
fix
srene Oct 27, 2024
7db51bf
fix merge
srene Oct 28, 2024
51a8154
fix
srene Oct 28, 2024
e24e318
improving logs
srene Oct 29, 2024
4032d57
updates
srene Nov 5, 2024
8425783
fixes
srene Nov 7, 2024
a04a4e0
fix rpc
srene Nov 7, 2024
b4f4dd6
lint fix
srene Nov 7, 2024
49ae0d1
test fix
srene Nov 7, 2024
62eb4cc
small improvements
srene Nov 7, 2024
d70ba3c
small improvements
srene Nov 7, 2024
dd1894b
small improvements
srene Nov 7, 2024
8779e83
small improvements
srene Nov 7, 2024
9a3805a
small improvements
srene Nov 7, 2024
53821e7
fixing test
srene Nov 7, 2024
b49606e
lint fix
srene Nov 7, 2024
e0abede
it leak
srene Nov 7, 2024
23864d5
discard not found error tx event
srene Nov 7, 2024
4eb7cc3
clean
srene Nov 7, 2024
cd7b2f8
prune fix
srene Nov 7, 2024
b5d914c
lint fix
srene Nov 7, 2024
c0b7939
inc toflush before
srene Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test fix
  • Loading branch information
srene committed Oct 22, 2024
commit 01d74233d0c362bdecaf7f39d50d8b916ea5a014
19 changes: 9 additions & 10 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,27 @@ func (m *Manager) PruneBlocks(retainHeight uint64) {
pruned, err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseBlocksyncHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("blocksync store pruned", "from", m.State.BaseBlocksyncHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseBlocksyncHeight = retainHeight
}
m.logger.Debug("blocksync store pruned", "from", m.State.BaseBlocksyncHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseBlocksyncHeight = retainHeight

// prune blocks from indexer store
pruned, err = m.IndexerService.Prune(m.State.BaseIndexerHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("indexer store pruned", "from", m.State.BaseIndexerHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseIndexerHeight = retainHeight
}

m.logger.Debug("indexer store pruned", "from", m.State.BaseIndexerHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseIndexerHeight = retainHeight

// prune blocks from dymint store
pruned, err = m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
m.logger.Error("pruning block store", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("dymint store pruned", "from", m.State.BaseHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseHeight = retainHeight
}

m.logger.Debug("dymint store pruned", "from", m.State.BaseHeight, "to", retainHeight, "pruned", pruned)
m.State.BaseHeight = retainHeight

// store state with base heights updates
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand Down
16 changes: 5 additions & 11 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,7 @@ func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, er
if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
return idx.pruneBlocks(from, to, logger)
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
Expand All @@ -553,14 +552,6 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
Expand All @@ -570,11 +561,14 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
logger.Debug("pruning block indexer deleting height key", "err", err)
continue
}

// we update here to avoid acccumulating batches in case pruneEvents fails
pruned++

if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
Expand Down
2 changes: 1 addition & 1 deletion indexers/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {

txPruned, err := txIndexer.Prune(1, 2, log.NewNopLogger())
require.NoError(t, err)
expectedTxPruned := uint64(2)
expectedTxPruned := uint64(1)
require.Equal(t, expectedTxPruned, txPruned)

}
28 changes: 16 additions & 12 deletions indexers/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,27 +595,31 @@ func (txi *TxIndex) pruneTxs(from, to uint64) (uint64, error) {
it := txi.store.PrefixIterator(prefixForHeight(int64(h)))
defer it.Discard()

if err := txi.pruneEvents(h, batch); err != nil {
continue
}
pruned++

for ; it.Valid(); it.Next() {
if err := batch.Delete(it.Key()); err != nil {
continue
}

if err := batch.Delete(it.Value()); err != nil {
continue
}
if err := txi.pruneEvents(h, batch); err != nil {
continue
}
pruned++
// flush every 1000 txs to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, int64(h))
if err != nil {
return 0, err
}
batch.Discard()
batch = txi.store.NewBatch()
}

// flush every 1000 txs to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, int64(h))
if err != nil {
return 0, err
}
batch.Discard()
batch = txi.store.NewBatch()
}

}

err := flush(batch, int64(to))
Expand Down
2 changes: 1 addition & 1 deletion p2p/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,5 @@ func (blocksync *BlockSync) LoadBlock(ctx context.Context, cid cid.Cid) (BlockDa

// RemoveBlock removes the block from the DAGservice.
func (blocksync *BlockSync) DeleteBlock(ctx context.Context, cid cid.Cid) error {
return blocksync.dsrv.Remove(ctx, cid)
return blocksync.dsrv.DeleteBlock(ctx, cid)
}
18 changes: 18 additions & 0 deletions p2p/block_sync_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ func (bsDagService *BlockSyncDagService) LoadBlock(ctx context.Context, cid cid.
return data, nil
}

func (bsDagService *BlockSyncDagService) DeleteBlock(ctx context.Context, cid cid.Cid) error {

// first it gets the root node
root, err := bsDagService.Get(ctx, cid)
if err != nil {
return err
}

// then it iterates all the cids to remove them from the block store
for _, l := range root.Links() {
err := bsDagService.Remove(ctx, l.Cid)
if err != nil {
return nil
}
}
return nil
}

// dagReader is used to read the DAG (all the block chunks) from the root (IPLD) node
func dagReader(root ipld.Node, ds ipld.DAGService) (io.Reader, error) {
ctx := context.Background()
Expand Down