Skip to content

Commit

Permalink
Merge branch 'main' into feature/proto-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
mzabaluev committed Oct 25, 2023
2 parents 69f391e + 822b277 commit 71c76c8
Show file tree
Hide file tree
Showing 83 changed files with 1,109 additions and 383 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[proxy]` Expand `ClientCreator` interface to allow
for per-"connection" control of client creation
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- `[rpc]` The endpoints `broadcast_tx_*` now return an error when the node is
performing block sync or state sync.
([\#785](https://github.com/cometbft/cometbft/issues/785))
- `[mempool]` When the node is performing block sync or state sync, the mempool
reactor now discards incoming transactions from peers, and does not propagate
transactions to peers.
([\#785](https://github.com/cometbft/cometbft/issues/785))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[abci/client]` Add consensus-synchronized local client creator,
which only imposes a mutex on the consensus "connection", leaving
the concurrency of all other "connections" up to the application
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
3 changes: 3 additions & 0 deletions .changelog/unreleased/improvements/1141-abci-unsync-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[abci/client]` Add fully unsynchronized local client creator, which
imposes no mutexes on the application, leaving all handling of concurrency up
to the application ([\#1141](https://github.com/cometbft/cometbft/pull/1141))
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
which is similar to `HasVoteMessage`; and random sleep in the loop broadcasting those messages.
The sleep can be configured with new config `peer_gossip_intraloop_sleep_duration`, which is set to 0
by default as this is experimental.
Our scale tests show substantial bandwith improvement with a value of 50 ms.
Our scale tests show substantial bandwidth improvement with a value of 50 ms.
([\#904](https://github.com/cometbft/cometbft/pull/904))
2 changes: 1 addition & 1 deletion .github/workflows/janitor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 3
steps:
- uses: styfle/cancel-workflow-action@0.11.0
- uses: styfle/cancel-workflow-action@0.12.0
with:
workflow_id: 1041851,1401230,2837803
access_token: ${{ github.token }}
2 changes: 1 addition & 1 deletion .github/workflows/proto-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
- uses: bufbuild/buf-setup-action@v1.26.1
- uses: bufbuild/buf-setup-action@v1.27.1
- uses: bufbuild/buf-lint-action@v1
with:
input: 'proto'
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ issues:
exclude-rules:
- path: _test\.go
linters:
- goconst
- gosec
- path: \.pb\.go
linters:
Expand All @@ -45,6 +44,8 @@ linters-settings:
max-blank-identifiers: 3
golint:
min-confidence: 0
goconst:
ignore-tests: true
maligned:
suggest-new: true
misspell:
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
which is similar to `HasVoteMessage`; and random sleep in the loop broadcasting those messages.
The sleep can be configured with new config `peer_gossip_intraloop_sleep_duration`, which is set to 0
by default as this is experimental.
Our scale tests show substantial bandwith improvement with a value of 50 ms.
Our scale tests show substantial bandwidth improvement with a value of 50 ms.
([\#904](https://github.com/cometbft/cometbft/pull/904))
- Update Apalache type annotations in the light client spec ([#955](https://github.com/cometbft/cometbft/pull/955))
- `[node]` Remove genesis persistence in state db, replaced by a hash
Expand Down
2 changes: 1 addition & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ The main motivation is the reduction of the storage footprint.

Events indexed with previous CometBFT or Tendermint Core versions, will still be transparently processed.
There is no need to re-index the events. This function field is not exposed to queries, and was not
visible to users. However, if you forked CometBFT and changed the indexer code directly to accomodate for this,
visible to users. However, if you forked CometBFT and changed the indexer code directly to accommodate for this,
this will impact your code.

## v0.37.0
Expand Down
16 changes: 10 additions & 6 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ type localClient struct {

var _ Client = (*localClient)(nil)

// NewLocalClient creates a local client, which wraps the application interface that
// Tendermint as the client will call to the application as the server. The only
// difference, is that the local client has a global mutex which enforces serialization
// of all the ABCI calls from Tendermint to the Application.
// NewLocalClient creates a local client, which wraps the application interface
// that Comet as the client will call to the application as the server.
//
// Concurrency control in each client instance is enforced by way of a single
// mutex. If a mutex is not supplied (i.e. if mtx is nil), then one will be
// created.
func NewLocalClient(mtx *cmtsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(cmtsync.Mutex)
Expand Down Expand Up @@ -135,15 +137,17 @@ func (app *localClient) OfferSnapshot(ctx context.Context, req *types.RequestOff
}

func (app *localClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *localClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

Expand Down
136 changes: 136 additions & 0 deletions abci/client/unsync_local_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package abcicli

import (
"context"
"sync"

types "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/service"
)

type unsyncLocalClient struct {
service.BaseService

types.Application

mtx sync.Mutex
Callback
}

var _ Client = (*unsyncLocalClient)(nil)

// NewUnsyncLocalClient creates a local client, which wraps the application
// interface that Comet as the client will call to the application as the
// server.
//
// This differs from [NewLocalClient] in that it returns a client that only
// maintains a mutex over the callback used by CheckTxAsync and not over the
// application, leaving it up to the proxy to handle all concurrency. If the
// proxy does not impose any concurrency restrictions, it is then left up to
// the application to implement its own concurrency for the relevant group of
// calls.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}

func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
app.Callback = cb
app.mtx.Unlock()
}

func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

//-------------------------------------------------------

func (app *unsyncLocalClient) Error() error {
return nil
}

func (app *unsyncLocalClient) Flush(context.Context) error {
return nil
}

func (app *unsyncLocalClient) Echo(_ context.Context, msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}

func (app *unsyncLocalClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
return app.Application.Info(ctx, req)
}

func (app *unsyncLocalClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return app.Application.CheckTx(ctx, req)
}

func (app *unsyncLocalClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return app.Application.Query(ctx, req)
}

func (app *unsyncLocalClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx, req)
}

func (app *unsyncLocalClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
return app.Application.InitChain(ctx, req)
}

func (app *unsyncLocalClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
return app.Application.ListSnapshots(ctx, req)
}

func (app *unsyncLocalClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
return app.Application.OfferSnapshot(ctx, req)
}

func (app *unsyncLocalClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
return app.Application.ApplySnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
return app.Application.PrepareProposal(ctx, req)
}

func (app *unsyncLocalClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
return app.Application.ProcessProposal(ctx, req)
}

func (app *unsyncLocalClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
return app.Application.ExtendVote(ctx, req)
}

func (app *unsyncLocalClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
return app.Application.VerifyVoteExtension(ctx, req)
}

func (app *unsyncLocalClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return app.Application.FinalizeBlock(ctx, req)
}
2 changes: 1 addition & 1 deletion api/cometbft/crypto/v1beta1/proof.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type consensusReactor interface {
SwitchToConsensus(state sm.State, skipWAL bool)
}

type mempoolReactor interface {
// for when we finish doing block sync or state sync
EnableInOutTxs()
}

type peerError struct {
err error
peerID p2p.ID
Expand Down Expand Up @@ -68,7 +73,6 @@ type Reactor struct {
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool, metrics *Metrics, offlineStateSyncHeight int64,
) *Reactor {

storeHeight := store.Height()
if storeHeight == 0 {
// If state sync was performed offline and the stores were bootstrapped to height H
Expand Down Expand Up @@ -386,12 +390,14 @@ FOR_LOOP:
continue FOR_LOOP
}
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.Logger.Info("Time to switch to consensus mode!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
if memR, ok := bcR.Switch.Reactor("MEMPOOL").(mempoolReactor); ok {
memR.EnableInOutTxs()
}
if conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor); ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
}
// else {
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ type P2PConfig struct { //nolint: maligned
// Testing params.
// Force dial to fail
TestDialFail bool `mapstructure:"test_dial_fail"`
// FUzz connection
// Fuzz connection
TestFuzz bool `mapstructure:"test_fuzz"`
TestFuzzConfig *FuzzConnConfig `mapstructure:"test_fuzz_config"`
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {

// Send our state to peer.
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.waitSync {
if !br.reactor.WaitSync() {
br.reactor.sendNewRoundStepMessage(peer)
}
}
Expand Down
Loading

0 comments on commit 71c76c8

Please sign in to comment.