Skip to content

Commit

Permalink
Merge #2124
Browse files Browse the repository at this point in the history
2124: Refactor eth watcher r=Deniallugo a=Deniallugo

In this PR I've completely separated eth watcher logic and now it's only for receiving the new events from Ethereum.
Now we store priority ops in database. 
The last step for this PR is the complex test with reverting block. 
But I'm not sure how to make it possible  

Co-authored-by: deniallugo <[email protected]>
Co-authored-by: Danil <[email protected]>
  • Loading branch information
bors-matterlabs-dev[bot] and Deniallugo authored Feb 21, 2022
2 parents e7f5620 + a9c978e commit 849d9ce
Show file tree
Hide file tree
Showing 54 changed files with 2,554 additions and 1,260 deletions.
72 changes: 68 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
run: |
echo ZKSYNC_HOME=$(pwd) >> $GITHUB_ENV
echo $(pwd)/bin >> $GITHUB_PATH
- name: start-services
run: |
docker-compose -f docker-compose-runner.yml down
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
run: |
echo ZKSYNC_HOME=$(pwd) >> $GITHUB_ENV
echo $(pwd)/bin >> $GITHUB_PATH
- name: start-services
run: |
docker-compose -f docker-compose-runner.yml down
Expand Down Expand Up @@ -126,7 +126,7 @@ jobs:

- name: integration-rust-sdk
run: ci_run zk test i rust-sdk

- name: integration-withdrawal-helpers
run: ci_run zk test i withdrawal-helpers

Expand Down Expand Up @@ -193,11 +193,75 @@ jobs:
- name: integration-testkit
run: ci_run zk test integration testkit --offline

revert-blocks:
runs-on: [self-hosted, FAST]

steps:
- uses: actions/checkout@v2

- name: setup-env
run: |
echo ZKSYNC_HOME=$(pwd) >> $GITHUB_ENV
echo $(pwd)/bin >> $GITHUB_PATH
- name: start-services
run: |
docker-compose -f docker-compose-runner.yml down
docker-compose -f docker-compose-runner.yml pull
docker-compose -f docker-compose-runner.yml up --build -d geth postgres zk
ci_run sccache --start-server
- name: init
run: |
ci_run ln -s /usr/src/keys/setup keys/setup
ci_run zk
ci_run zk dummy-prover enable --no-redeploy
ci_run zk init
- name: restart dev-liquidity-token-watcher and dev-ticker
run: docker-compose -f docker-compose-runner.yml restart dev-liquidity-token-watcher dev-ticker

- name: run-services
run: |
ci_run zk server core &>server.log &
ci_run zk server api &>api.log &
ci_run sleep 10
- name: loadtest
run: |
ci_run cargo build --release --bin loadnext
cd $ZKSYNC_HOME
docker-compose -f docker-compose-runner.yml exec -T -e ALLOWED_PERCENT=20 -e RUST_LOG=loadnext=debug -e ZKSYNC_RPC_ADDR=http://127.0.0.2:3030 -e WEB3_URL=http://geth:8545 -e ETH_NETWORK=localhost -e MASTER_WALLET_PK=74d8b3a188f7260f67698eb44da07397a298df5427df681ef68c45b34b61f998 -e ACCOUNTS_AMOUNT=5 -e OPERATIONS_PER_ACCOUNT=5 -e MAIN_TOKEN=DAI zk ./target/release/loadnext
docker-compose -f docker-compose-runner.yml exec -T -e ZKSYNC_REST_ADDR=http://127.0.0.2:3001 zk ts-node core/tests/check-block-root-hahes.ts
- name: stop-server
run: |
ci_run killall zksync_server
- name: revert-blocks
run: |
ci_run zk f cargo build --release --bin block_revert
ci_run zk f ./target/release/block_revert --last-correct-block=2 all
- name: check-server
run: |
ci_run zk server &>server.log &
ci_run sleep 30
docker-compose -f docker-compose-runner.yml exec -T -e ZKSYNC_REST_ADDR=http://127.0.0.2:3001 zk ts-node core/tests/check-block-root-hahes.ts
- name: Show logs
if: always()
run: |
ci_run cat server.log
ci_run cat api.log
notify:
if: always()
name: Notify on failures
runs-on: ubuntu-latest
needs: [lint, unit-tests, integration, circuit-tests, testkit]
needs: [lint, unit-tests, integration, circuit-tests, testkit, revert-blocks]
steps:
-
if: failure()
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Cargo.lock
/volumes
/logs
/loadtest-config
/core/tests/blocks.json

.ipynb_checkpoints

Expand Down
18 changes: 6 additions & 12 deletions core/bin/block_revert/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ async fn revert_blocks_in_storage(
.remove_eth_unprocessed_aggregated_ops()
.await?;
println!("`eth_unprocessed_aggregated_ops` table is cleaned");
transaction
.chain()
.operations_schema()
.remove_executed_priority_operations(last_block)
.await?;
println!("`executed_priority_operations` table is cleaned");
transaction
.chain()
.operations_schema()
Expand Down Expand Up @@ -160,18 +154,18 @@ async fn revert_blocks_on_contract(
) -> anyhow::Result<()> {
let tx_arg = Token::Array(blocks.iter().map(stored_block_info).collect());
let data = client.encode_tx_data("revertBlocks", tx_arg);
let gas_limit = 200000 + 5000 * blocks.len();
let gas_limit = 200000 + 15000 * blocks.len();
let signed_tx = client
.sign_prepared_tx(data, Options::with(|f| f.gas = Some(U256::from(gas_limit))))
.await
.map_err(|e| format_err!("Revert blocks send err: {}", e))?;
let receipt = send_raw_tx_and_wait_confirmation(client, signed_tx.raw_tx).await?;
storage.ethereum_schema().get_next_nonce().await
.expect("Ethereum tx has been sent but updating operator nonce in storage has failed. You need to update it manually");
ensure!(
receipt.status == Some(U64::from(1)),
"Tx to contract failed"
);
if receipt.status != Some(U64::from(1)) {
let reason = client.failure_reason(signed_tx.hash).await?;
anyhow::bail!("Tx to contract failed {:?}", reason);
}

println!("Blocks were reverted on contract");
Ok(())
Expand Down Expand Up @@ -218,7 +212,7 @@ struct Opt {
#[structopt(subcommand)]
command: Command,
/// Private key of operator which will call the contract function.
#[structopt(long = "key", env = "REVERT_TOOL_OPERATOR_PRIVATE_KEY")]
#[structopt(long = "key", env = "ETH_SENDER_SENDER_OPERATOR_PRIVATE_KEY")]
operator_private_key: String,
}

Expand Down
1 change: 1 addition & 0 deletions core/bin/data_restore/src/database_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,5 +418,6 @@ impl<'a> DatabaseStorageInteractor<'a> {
.get_max_priority_op_serial_id()
.await
.expect("Failed to retrieve maximum priority op serial id")
.unwrap_or(0)
}
}
3 changes: 1 addition & 2 deletions core/bin/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async fn run_server(components: &ComponentsToRun) {
ticker.clone(),
&JsonRpcConfig::from_env(),
&common_config,
private_config.url.clone(),
private_config.url,
eth_watch_config.confirmations_for_eth_event,
));
}
Expand All @@ -240,7 +240,6 @@ async fn run_server(components: &ComponentsToRun) {
contracts_config.contract_addr,
ticker,
sign_check_sender,
private_config.url,
);
}
}
Expand Down
48 changes: 13 additions & 35 deletions core/bin/zksync_api/src/api_server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@
// Built-in uses
use std::collections::HashMap;
use std::time::Instant;

// External uses
use num::BigUint;

// Workspace uses
use zksync_api_types::v02::account::{
DepositingAccountBalances, DepositingFunds, OngoingDeposit, OngoingDepositsResp,
};
use zksync_api_types::v02::account::{DepositingAccountBalances, DepositingFunds, OngoingDeposit};
use zksync_storage::StorageProcessor;
use zksync_types::{Address, H256};
use zksync_utils::remove_prefix;

// Local uses
use super::rest::v02::error::Error;
use crate::{
core_api_client::CoreApiClient, fee_ticker::PriceError, utils::token_db_cache::TokenDBCache,
};
use crate::{fee_ticker::PriceError, utils::token_db_cache::TokenDBCache};

pub fn try_parse_hash(query: &str) -> Result<H256, hex::FromHexError> {
const HASH_SIZE: usize = 32; // 32 bytes
Expand All @@ -32,15 +27,15 @@ pub fn try_parse_hash(query: &str) -> Result<H256, hex::FromHexError> {
Ok(H256::from_slice(&slice))
}

pub(crate) async fn depositing_from_pending_ops(
async fn depositing_from_pending_ops(
storage: &mut StorageProcessor<'_>,
tokens: &TokenDBCache,
pending_ops: OngoingDepositsResp,
pending_ops: Vec<OngoingDeposit>,
confirmations_for_eth_event: u64,
) -> Result<DepositingAccountBalances, Error> {
let mut balances = HashMap::new();

for op in pending_ops.deposits {
for op in pending_ops {
let token_symbol = if *op.token_id == 0 {
"ETH".to_string()
} else {
Expand Down Expand Up @@ -72,36 +67,19 @@ pub(crate) async fn depositing_from_pending_ops(
Ok(DepositingAccountBalances { balances })
}

pub(crate) async fn get_pending_ops(
core_api_client: &CoreApiClient,
address: Address,
) -> Result<OngoingDepositsResp, Error> {
let start = Instant::now();

let ongoing_ops = core_api_client
.get_unconfirmed_deposits(address)
.await
.map_err(Error::core_api)?;

// Transform operations into `OngoingDeposit`.
let deposits: Vec<_> = ongoing_ops.into_iter().map(OngoingDeposit::new).collect();

metrics::histogram!(
"api",
start.elapsed(),
"type" => "rpc",
"endpoint_name" => "get_ongoing_deposits"
);
Ok(OngoingDepositsResp { deposits })
}

pub async fn get_depositing(
storage: &mut StorageProcessor<'_>,
core_api_client: &CoreApiClient,
tokens: &TokenDBCache,
address: Address,
confirmations_for_eth_event: u64,
) -> Result<DepositingAccountBalances, Error> {
let pending_ops = get_pending_ops(core_api_client, address).await?;
let pending_ops = storage
.chain()
.mempool_schema()
.get_pending_deposits(address)
.await?
.into_iter()
.map(OngoingDeposit::new)
.collect();
depositing_from_pending_ops(storage, tokens, pending_ops, confirmations_for_eth_event).await
}
3 changes: 1 addition & 2 deletions core/bin/zksync_api/src/api_server/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub fn start_server_thread_detached(
contract_address: H160,
fee_ticker: FeeTicker,
sign_verifier: mpsc::Sender<VerifySignatureRequest>,
private_url: String,
) -> JoinHandle<()> {
let (handler, panic_sender) = spawn_panic_handler();

Expand All @@ -99,7 +98,7 @@ pub fn start_server_thread_detached(
// TODO remove this config ZKS-815
let config = ZkSyncConfig::from_env();

let api_v01 = ApiV01::new(connection_pool, contract_address, private_url, config);
let api_v01 = ApiV01::new(connection_pool, contract_address, config);
api_v01.spawn_network_status_updater(panic_sender);

start_server(api_v01, fee_ticker, sign_verifier, listen_addr).await;
Expand Down
23 changes: 9 additions & 14 deletions core/bin/zksync_api/src/api_server/rest/v01/api_decl.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
//! Declaration of the API structure.
use crate::{
api_server::rest::{
helpers::*,
v01::{caches::Caches, network_status::SharedNetworkStatus},
},
core_api_client::CoreApiClient,
use crate::api_server::rest::{
helpers::*,
v01::{caches::Caches, network_status::SharedNetworkStatus},
};
use actix_web::error::InternalError;
use actix_web::{web, HttpResponse, Result as ActixResult};
use futures::channel::mpsc;
use zksync_api_types::PriorityOpLookupQuery;

use zksync_config::ZkSyncConfig;
use zksync_storage::{
Expand All @@ -31,7 +27,6 @@ use zksync_types::{block::ExecutedOperations, BlockNumber, PriorityOp, H160, H25
pub struct ApiV01 {
pub(crate) caches: Caches,
pub(crate) connection_pool: ConnectionPool,
pub(crate) api_client: CoreApiClient,
pub(crate) network_status: SharedNetworkStatus,
pub(crate) contract_address: String,
pub(crate) config: ZkSyncConfig,
Expand All @@ -41,14 +36,11 @@ impl ApiV01 {
pub fn new(
connection_pool: ConnectionPool,
contract_address: H160,
private_url: String,
config: ZkSyncConfig,
) -> Self {
let api_client = CoreApiClient::new(private_url);
Self {
caches: Caches::new(config.api.common.caches_size),
connection_pool,
api_client,
network_status: SharedNetworkStatus::default(),
contract_address: format!("{:?}", contract_address),
config,
Expand Down Expand Up @@ -279,8 +271,11 @@ impl ApiV01 {
&self,
eth_tx_hash: H256,
) -> Result<Option<PriorityOp>, anyhow::Error> {
self.api_client
.get_unconfirmed_op(PriorityOpLookupQuery::ByEthHash(eth_tx_hash))
.await
let mut storage = self.connection_pool.access_storage().await?;
Ok(storage
.chain()
.mempool_schema()
.get_pending_operation_by_hash(eth_tx_hash)
.await?)
}
}
Loading

0 comments on commit 849d9ce

Please sign in to comment.