From af18707ef09628f8a3372f6f647e6d0c26bba6f9 Mon Sep 17 00:00:00 2001 From: perekopskiy Date: Tue, 19 Oct 2021 20:49:58 +0300 Subject: [PATCH] Update migration --- core/bin/tx_filters_migration/src/main.rs | 131 +++++++++---- core/lib/storage/sqlx-data.json | 180 +++++++++++------- .../storage/src/chain/operations_ext/mod.rs | 54 +++++- 3 files changed, 250 insertions(+), 115 deletions(-) diff --git a/core/bin/tx_filters_migration/src/main.rs b/core/bin/tx_filters_migration/src/main.rs index e2ff2378fe..3f4f39dae8 100644 --- a/core/bin/tx_filters_migration/src/main.rs +++ b/core/bin/tx_filters_migration/src/main.rs @@ -1,54 +1,105 @@ +use tokio::time::{sleep, Duration}; use zksync_storage::{utils::affected_accounts, StorageProcessor}; +const DELTA: u32 = 100; +const TIMEOUT: Duration = Duration::from_secs(2); + +macro_rules! wait_for_success { + ($l:expr) => { + loop { + match $l { + Ok(result) => break result, + Err(e) => println!("{}", e), + } + sleep(TIMEOUT).await; + } + }; +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let mut storage = StorageProcessor::establish_connection().await?; - let mut transaction = storage.start_transaction().await?; - - let mut addresses = Vec::new(); - let mut tokens = Vec::new(); - let mut hashes = Vec::new(); - - let txs = transaction - .chain() - .operations_ext_schema() - .load_all_executed_transactions() - .await?; - for (hash, tx) in txs { - let affected_accounts = affected_accounts(&tx, &mut transaction).await?; - let used_tokens = tx.tokens(); - for address in affected_accounts { - for token in used_tokens.clone() { + + let mut last_updated_block = wait_for_success!( + storage + .chain() + .operations_ext_schema() + .last_block_with_updated_tx_filters() + .await + ); + + loop { + sleep(TIMEOUT).await; + let mut transaction = wait_for_success!(storage.start_transaction().await); + let last_saved_block = wait_for_success!( + transaction + .chain() + .block_schema() + .get_last_saved_block() + .await + ); + + let from = last_updated_block; + let to = std::cmp::min(last_updated_block + DELTA, last_saved_block); + + let mut addresses = Vec::new(); + let mut tokens = Vec::new(); + let mut hashes = Vec::new(); + + let txs = wait_for_success!( + transaction + .chain() + .operations_ext_schema() + .load_executed_txs_in_block_range(from, to) + .await + ); + for (hash, tx) in txs { + let affected_accounts = + wait_for_success!(affected_accounts(&tx, &mut transaction).await); + let used_tokens = tx.tokens(); + for address in affected_accounts { + for token in used_tokens.clone() { + addresses.push(address.as_bytes().to_vec()); + tokens.push(token.0 as i32); + hashes.push(hash.as_bytes().to_vec()); + } + } + } + + let priority_ops = wait_for_success!( + transaction + .chain() + .operations_ext_schema() + .load_executed_priority_ops_in_block_range(from, to) + .await + ); + for (hash, op) in priority_ops { + let op = op.try_get_priority_op().unwrap(); + let affected_accounts = op.affected_accounts(); + let token = op.token_id(); + for address in affected_accounts { addresses.push(address.as_bytes().to_vec()); tokens.push(token.0 as i32); hashes.push(hash.as_bytes().to_vec()); } } - } - let priority_ops = transaction - .chain() - .operations_ext_schema() - .load_all_executed_priority_operations() - .await?; - for (hash, op) in priority_ops { - let op = op.try_get_priority_op().unwrap(); - let affected_accounts = op.affected_accounts(); - let token = op.token_id(); - for address in affected_accounts { - addresses.push(address.as_bytes().to_vec()); - tokens.push(token.0 as i32); - hashes.push(hash.as_bytes().to_vec()); + wait_for_success!( + transaction + .chain() + .operations_ext_schema() + .save_executed_tx_filters(addresses.clone(), tokens.clone(), hashes.clone()) + .await + ); + + match transaction.commit().await { + Ok(_) => { + println!("Updated from {} to {}", from, to); + last_updated_block = to; + } + Err(e) => { + println!("Failed to commit transaction: {}", e); + } } } - - transaction - .chain() - .operations_ext_schema() - .save_executed_tx_filters(addresses, tokens, hashes) - .await?; - transaction.commit().await?; - - println!("Success"); - Ok(()) } diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index e5eadb3ded..9d236ad8f8 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -42,6 +42,33 @@ ] } }, + "052bc740befe43cd3d8d915371cb055187d4ff4ebf019fe12c8dc85b296acc47": { + "query": "SELECT tx_hash, tx FROM executed_transactions WHERE block_number BETWEEN $1 AND $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "tx", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false + ] + } + }, "065f83bfa5ed2f2dff30ce38b9d2dd4518ec00f81a98bc378f30a07db5f7b678": { "query": "\n WITH tx_hashes AS (\n SELECT DISTINCT tx_hash FROM tx_filters\n WHERE address = $1 AND ($2::boolean OR token = $3)\n INTERSECT\n SELECT DISTINCT tx_hash FROM tx_filters\n WHERE $4::boolean OR (address = $5 AND ($2::boolean OR token = $3))\n )\n SELECT COUNT(*) as \"count!\" FROM tx_hashes\n ", "describe": { @@ -571,6 +598,33 @@ "nullable": [] } }, + "15021baae00c1cc0a1da3cfc3794e78ede86b761ef2765f90af050fdbf42a833": { + "query": "SELECT tx_hash, operation FROM executed_priority_operations WHERE block_number BETWEEN $1 AND $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "operation", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false + ] + } + }, "15faacf14edd991dedc35011ef12eefc5a04771a6b3f24a4c655f9259c9ea572": { "query": "SELECT * FROM account_balance_updates WHERE block_number > $1 AND block_number <= $2 ", "describe": { @@ -870,6 +924,24 @@ ] } }, + "1befbe4a2d8c2847779eba2b1981da547cf6a646ce5451ac63c67f5b765194f9": { + "query": "\n SELECT MAX(block_number) as \"max!\" FROM tx_filters\n INNER JOIN executed_transactions\n ON tx_filters.tx_hash = executed_transactions.tx_hash\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + } + }, "1c02281a5f82e18874515bad5038402ae5718ec633b56463c99fee0beb0e8afd": { "query": "\n SELECT eth_operations.*,\n aggregate_operations.id as \"agg_op_id?\",\n aggregate_operations.arguments as \"arguments?\"\n FROM eth_operations\n LEFT JOIN eth_aggregated_ops_binding\n ON eth_aggregated_ops_binding.eth_op_id = eth_operations.id\n LEFT JOIN aggregate_operations\n ON aggregate_operations.id = eth_aggregated_ops_binding.op_id\n WHERE eth_operations.confirmed = false\n ORDER BY eth_operations.id ASC\n ", "describe": { @@ -1726,30 +1798,6 @@ "nullable": [] } }, - "368a2554817fe33f4f39ad2db04226d4980ce461853e7eddd1d50cb23e2762bc": { - "query": "SELECT tx_hash, tx FROM executed_transactions", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "tx", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - } - }, "393fa462bb0a3b247c99946e569f06fc7fa1f742d564adce560ac69e1729fece": { "query": "SELECT * FROM balances WHERE account_id = ANY($1)", "describe": { @@ -3794,20 +3842,6 @@ ] } }, - "8c0c66910d1fdfd08ba2e1c3db282720a67b05067abf71d0f0b27c27047ce0c7": { - "query": "\n INSERT INTO tx_filters (address, token, tx_hash)\n SELECT u.address, u.token, u.tx_hash\n FROM UNNEST ($1::bytea[], $2::integer[], $3::bytea[])\n AS u(address, token, tx_hash)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "ByteaArray", - "Int4Array", - "ByteaArray" - ] - }, - "nullable": [] - } - }, "8c2b6d94cb84616a33ecfb94be7153b3d760b456fa24af058076a69a6f4f204c": { "query": "\n SELECT * FROM mint_nft_updates \n WHERE token_id = $1\n ", "describe": { @@ -4093,6 +4127,24 @@ ] } }, + "9442f24e1c161116ba90b100676a5ee1c3a68853c984fe5ec97575cc899aae14": { + "query": "\n SELECT MAX(block_number) as \"max!\" FROM tx_filters\n INNER JOIN executed_priority_operations\n ON tx_filters.tx_hash = executed_priority_operations.tx_hash\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + } + }, "94963192743a692ece0adf4f1607b808160476a25e0b08a05c9424002533622b": { "query": "\n WITH transactions AS (\n SELECT\n tx_hash,\n tx as op,\n block_number,\n created_at,\n success,\n fail_reason,\n Null::bytea as eth_hash,\n Null::bigint as priority_op_serialid,\n block_index,\n batch_id\n FROM executed_transactions\n WHERE block_number = $1 AND created_at >= $2\n ), priority_ops AS (\n SELECT\n tx_hash,\n operation as op,\n block_number,\n created_at,\n true as success,\n Null as fail_reason,\n eth_hash,\n priority_op_serialid,\n block_index,\n Null::bigint as batch_id\n FROM executed_priority_operations\n WHERE block_number = $1 AND created_at >= $2\n ), everything AS (\n SELECT * FROM transactions\n UNION ALL\n SELECT * FROM priority_ops\n )\n SELECT\n tx_hash as \"tx_hash!\",\n block_number as \"block_number!\",\n op as \"op!\",\n created_at as \"created_at!\",\n success as \"success!\",\n fail_reason as \"fail_reason?\",\n eth_hash as \"eth_hash?\",\n priority_op_serialid as \"priority_op_serialid?\",\n batch_id as \"batch_id?\"\n FROM everything\n ORDER BY created_at ASC, block_index ASC\n LIMIT $3\n ", "describe": { @@ -4836,13 +4888,13 @@ "Left": [] }, "nullable": [ - false, - false, - false, - false, - false, - false, - false + true, + true, + true, + true, + true, + true, + true ] } }, @@ -5182,30 +5234,6 @@ ] } }, - "bf9a61ef4f49f9929f2d814d0e252e4e9baa04b25fc3970b3b561a29061d539f": { - "query": "SELECT tx_hash, operation FROM executed_priority_operations", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "operation", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - } - }, "c08f5c773d9475d06ae0a0e0771de9b004e1a3c9811a8a165acf079c198a9cb5": { "query": "\n SELECT id, address, decimals, kind as \"kind: _\", symbol FROM tokens\n WHERE id = $1\n LIMIT 1\n ", "describe": { @@ -5897,6 +5925,20 @@ "nullable": [] } }, + "d106212871e6b0266259fa0f819ab51c1e58ec799a45b618a4ec4d67d00f49be": { + "query": "\n INSERT INTO tx_filters (address, token, tx_hash)\n SELECT u.address, u.token, u.tx_hash\n FROM UNNEST ($1::bytea[], $2::integer[], $3::bytea[])\n AS u(address, token, tx_hash)\n ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "Int4Array", + "ByteaArray" + ] + }, + "nullable": [] + } + }, "d18525d8bf10383d307bf56110fac63276a82dc8b65b358c098fca7c2991579e": { "query": "SELECT MAX(id) as max FROM events", "describe": { diff --git a/core/lib/storage/src/chain/operations_ext/mod.rs b/core/lib/storage/src/chain/operations_ext/mod.rs index 8faf45c0a9..6152e012dc 100644 --- a/core/lib/storage/src/chain/operations_ext/mod.rs +++ b/core/lib/storage/src/chain/operations_ext/mod.rs @@ -1593,10 +1593,18 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> { Ok(receipts) } - pub async fn load_all_executed_transactions(&mut self) -> QueryResult> { - let records = sqlx::query!("SELECT tx_hash, tx FROM executed_transactions") - .fetch_all(self.0.conn()) - .await?; + pub async fn load_executed_txs_in_block_range( + &mut self, + from_block: BlockNumber, + to_block: BlockNumber, + ) -> QueryResult> { + let records = sqlx::query!( + "SELECT tx_hash, tx FROM executed_transactions WHERE block_number BETWEEN $1 AND $2", + from_block.0 as i64, + to_block.0 as i64 + ) + .fetch_all(self.0.conn()) + .await?; let result = records .into_iter() .map(|record| { @@ -1609,10 +1617,16 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> { Ok(result) } - pub async fn load_all_executed_priority_operations( + pub async fn load_executed_priority_ops_in_block_range( &mut self, + from_block: BlockNumber, + to_block: BlockNumber, ) -> QueryResult> { - let records = sqlx::query!("SELECT tx_hash, operation FROM executed_priority_operations") + let records = sqlx::query!( + "SELECT tx_hash, operation FROM executed_priority_operations WHERE block_number BETWEEN $1 AND $2", + from_block.0 as i64, + to_block.0 as i64 + ) .fetch_all(self.0.conn()) .await?; let result = records @@ -1627,6 +1641,33 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> { Ok(result) } + pub async fn last_block_with_updated_tx_filters(&mut self) -> QueryResult { + let max1: i64 = sqlx::query!( + r#" + SELECT MAX(block_number) as "max?" FROM tx_filters + INNER JOIN executed_transactions + ON tx_filters.tx_hash = executed_transactions.tx_hash + "# + ) + .fetch_one(self.0.conn()) + .await? + .max + .unwrap_or_default(); + let max2: i64 = sqlx::query!( + r#" + SELECT MAX(block_number) as "max?" FROM tx_filters + INNER JOIN executed_priority_operations + ON tx_filters.tx_hash = executed_priority_operations.tx_hash + "# + ) + .fetch_one(self.0.conn()) + .await? + .max + .unwrap_or_default(); + + Ok(BlockNumber(std::cmp::max(max1, max2) as u32)) + } + pub async fn save_executed_tx_filters( &mut self, addresses: Vec>, @@ -1639,6 +1680,7 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> { SELECT u.address, u.token, u.tx_hash FROM UNNEST ($1::bytea[], $2::integer[], $3::bytea[]) AS u(address, token, tx_hash) + ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING ", &addresses, &tokens,