Skip to content

Commit

Permalink
Update migration
Browse files Browse the repository at this point in the history
  • Loading branch information
perekopskiy committed Oct 19, 2021
1 parent 232f987 commit af18707
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 115 deletions.
131 changes: 91 additions & 40 deletions core/bin/tx_filters_migration/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,105 @@
use tokio::time::{sleep, Duration};
use zksync_storage::{utils::affected_accounts, StorageProcessor};

const DELTA: u32 = 100;
const TIMEOUT: Duration = Duration::from_secs(2);

macro_rules! wait_for_success {
($l:expr) => {
loop {
match $l {
Ok(result) => break result,
Err(e) => println!("{}", e),
}
sleep(TIMEOUT).await;
}
};
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut storage = StorageProcessor::establish_connection().await?;
let mut transaction = storage.start_transaction().await?;

let mut addresses = Vec::new();
let mut tokens = Vec::new();
let mut hashes = Vec::new();

let txs = transaction
.chain()
.operations_ext_schema()
.load_all_executed_transactions()
.await?;
for (hash, tx) in txs {
let affected_accounts = affected_accounts(&tx, &mut transaction).await?;
let used_tokens = tx.tokens();
for address in affected_accounts {
for token in used_tokens.clone() {

let mut last_updated_block = wait_for_success!(
storage
.chain()
.operations_ext_schema()
.last_block_with_updated_tx_filters()
.await
);

loop {
sleep(TIMEOUT).await;
let mut transaction = wait_for_success!(storage.start_transaction().await);
let last_saved_block = wait_for_success!(
transaction
.chain()
.block_schema()
.get_last_saved_block()
.await
);

let from = last_updated_block;
let to = std::cmp::min(last_updated_block + DELTA, last_saved_block);

let mut addresses = Vec::new();
let mut tokens = Vec::new();
let mut hashes = Vec::new();

let txs = wait_for_success!(
transaction
.chain()
.operations_ext_schema()
.load_executed_txs_in_block_range(from, to)
.await
);
for (hash, tx) in txs {
let affected_accounts =
wait_for_success!(affected_accounts(&tx, &mut transaction).await);
let used_tokens = tx.tokens();
for address in affected_accounts {
for token in used_tokens.clone() {
addresses.push(address.as_bytes().to_vec());
tokens.push(token.0 as i32);
hashes.push(hash.as_bytes().to_vec());
}
}
}

let priority_ops = wait_for_success!(
transaction
.chain()
.operations_ext_schema()
.load_executed_priority_ops_in_block_range(from, to)
.await
);
for (hash, op) in priority_ops {
let op = op.try_get_priority_op().unwrap();
let affected_accounts = op.affected_accounts();
let token = op.token_id();
for address in affected_accounts {
addresses.push(address.as_bytes().to_vec());
tokens.push(token.0 as i32);
hashes.push(hash.as_bytes().to_vec());
}
}
}

let priority_ops = transaction
.chain()
.operations_ext_schema()
.load_all_executed_priority_operations()
.await?;
for (hash, op) in priority_ops {
let op = op.try_get_priority_op().unwrap();
let affected_accounts = op.affected_accounts();
let token = op.token_id();
for address in affected_accounts {
addresses.push(address.as_bytes().to_vec());
tokens.push(token.0 as i32);
hashes.push(hash.as_bytes().to_vec());
wait_for_success!(
transaction
.chain()
.operations_ext_schema()
.save_executed_tx_filters(addresses.clone(), tokens.clone(), hashes.clone())
.await
);

match transaction.commit().await {
Ok(_) => {
println!("Updated from {} to {}", from, to);
last_updated_block = to;
}
Err(e) => {
println!("Failed to commit transaction: {}", e);
}
}
}

transaction
.chain()
.operations_ext_schema()
.save_executed_tx_filters(addresses, tokens, hashes)
.await?;
transaction.commit().await?;

println!("Success");
Ok(())
}
180 changes: 111 additions & 69 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,33 @@
]
}
},
"052bc740befe43cd3d8d915371cb055187d4ff4ebf019fe12c8dc85b296acc47": {
"query": "SELECT tx_hash, tx FROM executed_transactions WHERE block_number BETWEEN $1 AND $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
},
{
"ordinal": 1,
"name": "tx",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
},
"nullable": [
false,
false
]
}
},
"065f83bfa5ed2f2dff30ce38b9d2dd4518ec00f81a98bc378f30a07db5f7b678": {
"query": "\n WITH tx_hashes AS (\n SELECT DISTINCT tx_hash FROM tx_filters\n WHERE address = $1 AND ($2::boolean OR token = $3)\n INTERSECT\n SELECT DISTINCT tx_hash FROM tx_filters\n WHERE $4::boolean OR (address = $5 AND ($2::boolean OR token = $3))\n )\n SELECT COUNT(*) as \"count!\" FROM tx_hashes\n ",
"describe": {
Expand Down Expand Up @@ -571,6 +598,33 @@
"nullable": []
}
},
"15021baae00c1cc0a1da3cfc3794e78ede86b761ef2765f90af050fdbf42a833": {
"query": "SELECT tx_hash, operation FROM executed_priority_operations WHERE block_number BETWEEN $1 AND $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
},
{
"ordinal": 1,
"name": "operation",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
},
"nullable": [
false,
false
]
}
},
"15faacf14edd991dedc35011ef12eefc5a04771a6b3f24a4c655f9259c9ea572": {
"query": "SELECT * FROM account_balance_updates WHERE block_number > $1 AND block_number <= $2 ",
"describe": {
Expand Down Expand Up @@ -870,6 +924,24 @@
]
}
},
"1befbe4a2d8c2847779eba2b1981da547cf6a646ce5451ac63c67f5b765194f9": {
"query": "\n SELECT MAX(block_number) as \"max!\" FROM tx_filters\n INNER JOIN executed_transactions\n ON tx_filters.tx_hash = executed_transactions.tx_hash\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "max!",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
}
},
"1c02281a5f82e18874515bad5038402ae5718ec633b56463c99fee0beb0e8afd": {
"query": "\n SELECT eth_operations.*,\n aggregate_operations.id as \"agg_op_id?\",\n aggregate_operations.arguments as \"arguments?\"\n FROM eth_operations\n LEFT JOIN eth_aggregated_ops_binding\n ON eth_aggregated_ops_binding.eth_op_id = eth_operations.id\n LEFT JOIN aggregate_operations\n ON aggregate_operations.id = eth_aggregated_ops_binding.op_id\n WHERE eth_operations.confirmed = false\n ORDER BY eth_operations.id ASC\n ",
"describe": {
Expand Down Expand Up @@ -1726,30 +1798,6 @@
"nullable": []
}
},
"368a2554817fe33f4f39ad2db04226d4980ce461853e7eddd1d50cb23e2762bc": {
"query": "SELECT tx_hash, tx FROM executed_transactions",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
},
{
"ordinal": 1,
"name": "tx",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
}
},
"393fa462bb0a3b247c99946e569f06fc7fa1f742d564adce560ac69e1729fece": {
"query": "SELECT * FROM balances WHERE account_id = ANY($1)",
"describe": {
Expand Down Expand Up @@ -3794,20 +3842,6 @@
]
}
},
"8c0c66910d1fdfd08ba2e1c3db282720a67b05067abf71d0f0b27c27047ce0c7": {
"query": "\n INSERT INTO tx_filters (address, token, tx_hash)\n SELECT u.address, u.token, u.tx_hash\n FROM UNNEST ($1::bytea[], $2::integer[], $3::bytea[])\n AS u(address, token, tx_hash)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray",
"Int4Array",
"ByteaArray"
]
},
"nullable": []
}
},
"8c2b6d94cb84616a33ecfb94be7153b3d760b456fa24af058076a69a6f4f204c": {
"query": "\n SELECT * FROM mint_nft_updates \n WHERE token_id = $1\n ",
"describe": {
Expand Down Expand Up @@ -4093,6 +4127,24 @@
]
}
},
"9442f24e1c161116ba90b100676a5ee1c3a68853c984fe5ec97575cc899aae14": {
"query": "\n SELECT MAX(block_number) as \"max!\" FROM tx_filters\n INNER JOIN executed_priority_operations\n ON tx_filters.tx_hash = executed_priority_operations.tx_hash\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "max!",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
}
},
"94963192743a692ece0adf4f1607b808160476a25e0b08a05c9424002533622b": {
"query": "\n WITH transactions AS (\n SELECT\n tx_hash,\n tx as op,\n block_number,\n created_at,\n success,\n fail_reason,\n Null::bytea as eth_hash,\n Null::bigint as priority_op_serialid,\n block_index,\n batch_id\n FROM executed_transactions\n WHERE block_number = $1 AND created_at >= $2\n ), priority_ops AS (\n SELECT\n tx_hash,\n operation as op,\n block_number,\n created_at,\n true as success,\n Null as fail_reason,\n eth_hash,\n priority_op_serialid,\n block_index,\n Null::bigint as batch_id\n FROM executed_priority_operations\n WHERE block_number = $1 AND created_at >= $2\n ), everything AS (\n SELECT * FROM transactions\n UNION ALL\n SELECT * FROM priority_ops\n )\n SELECT\n tx_hash as \"tx_hash!\",\n block_number as \"block_number!\",\n op as \"op!\",\n created_at as \"created_at!\",\n success as \"success!\",\n fail_reason as \"fail_reason?\",\n eth_hash as \"eth_hash?\",\n priority_op_serialid as \"priority_op_serialid?\",\n batch_id as \"batch_id?\"\n FROM everything\n ORDER BY created_at ASC, block_index ASC\n LIMIT $3\n ",
"describe": {
Expand Down Expand Up @@ -4836,13 +4888,13 @@
"Left": []
},
"nullable": [
false,
false,
false,
false,
false,
false,
false
true,
true,
true,
true,
true,
true,
true
]
}
},
Expand Down Expand Up @@ -5182,30 +5234,6 @@
]
}
},
"bf9a61ef4f49f9929f2d814d0e252e4e9baa04b25fc3970b3b561a29061d539f": {
"query": "SELECT tx_hash, operation FROM executed_priority_operations",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
},
{
"ordinal": 1,
"name": "operation",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
}
},
"c08f5c773d9475d06ae0a0e0771de9b004e1a3c9811a8a165acf079c198a9cb5": {
"query": "\n SELECT id, address, decimals, kind as \"kind: _\", symbol FROM tokens\n WHERE id = $1\n LIMIT 1\n ",
"describe": {
Expand Down Expand Up @@ -5897,6 +5925,20 @@
"nullable": []
}
},
"d106212871e6b0266259fa0f819ab51c1e58ec799a45b618a4ec4d67d00f49be": {
"query": "\n INSERT INTO tx_filters (address, token, tx_hash)\n SELECT u.address, u.token, u.tx_hash\n FROM UNNEST ($1::bytea[], $2::integer[], $3::bytea[])\n AS u(address, token, tx_hash)\n ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray",
"Int4Array",
"ByteaArray"
]
},
"nullable": []
}
},
"d18525d8bf10383d307bf56110fac63276a82dc8b65b358c098fca7c2991579e": {
"query": "SELECT MAX(id) as max FROM events",
"describe": {
Expand Down
Loading

0 comments on commit af18707

Please sign in to comment.