Skip to content

Commit a424849

Browse files
feat(gateway): Full indexing for token migration (#3638)
## What ❔ Do not rely on `zks_confirmedTokens` endpoint ## Why ❔ <!-- Why are these changes done? What goal do they contribute to? What are the principles behind them? --> <!-- The `Why` has to be clear to non-Matter Labs entities running their own ZK Chain --> <!-- Example: PR templates ensure PR reviewers, observers, and future iterators are in context about the evolution of repos. --> ## Is this a breaking change? - [ ] Yes - [ ] No ## Operational changes <!-- Any config changes? Any new flags? Any changes to any scripts? --> <!-- Please add anything that non-Matter Labs entities running their own ZK Chain may need to know --> ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`.
1 parent 20cf447 commit a424849

File tree

6 files changed

+322
-259
lines changed

6 files changed

+322
-259
lines changed

crates/zkstack/src/commands/chain/gateway_upgrade.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ use zkstack_cli_types::L1BatchCommitmentMode;
2828
use zksync_basic_types::{Address, U256};
2929

3030
use crate::{
31-
commands::dev::commands::gateway::{
32-
check_chain_readiness, fetch_chain_info, get_admin_call_builder,
33-
set_upgrade_timestamp_calldata, DAMode, GatewayUpgradeArgsInner, GatewayUpgradeInfo,
31+
commands::dev::commands::{
32+
events_gatherer::DEFAULT_BLOCK_RANGE,
33+
gateway::{
34+
check_chain_readiness, fetch_chain_info, get_admin_call_builder,
35+
set_upgrade_timestamp_calldata, DAMode, GatewayUpgradeArgsInner, GatewayUpgradeInfo,
36+
},
3437
},
3538
messages::MSG_CHAIN_NOT_INITIALIZED,
3639
utils::forge::{fill_forge_private_key, WalletOwner},
@@ -265,6 +268,7 @@ async fn finalize_stage1(
265268
l1_url.clone(),
266269
general_config.get("api.web3_json_rpc.http_url")?,
267270
chain_config.chain_id.as_u64(),
271+
None,
268272
)
269273
.await?;
270274

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
use std::{
2+
fs::{File, OpenOptions},
3+
io::{Read, Write},
4+
time::Duration,
5+
};
6+
7+
use ethers::prelude::*;
8+
use serde::{Deserialize, Serialize};
9+
use tokio::time::sleep;
10+
11+
pub(crate) const DEFAULT_BLOCK_RANGE: u64 = 50_000;
12+
13+
/// A single log entry we want to store.
14+
#[derive(Debug, Serialize, Deserialize)]
15+
pub(crate) struct QueriedLog {
16+
/// The block number where this log was found (optional because logs can have None for pending blocks)
17+
pub(crate) block_number: Option<u64>,
18+
/// The transaction hash for the log
19+
pub(crate) transaction_hash: Option<H256>,
20+
/// The address this log was emitted from
21+
pub(crate) address: Address,
22+
/// All topics for the log
23+
pub(crate) topics: Vec<H256>,
24+
/// The data field of the log
25+
pub(crate) data: Vec<u8>,
26+
}
27+
28+
/// A cache structure that keeps track of how far we’ve fetched and
29+
/// also retains all queried logs in a vector.
30+
#[derive(Debug, Serialize, Deserialize)]
31+
struct Cache {
32+
first_seen_block: u64,
33+
last_seen_block: u64,
34+
all_logs: Vec<QueriedLog>,
35+
}
36+
37+
/// Read a cache file from disk.
38+
fn read_cache_from_file(cache_path: &str) -> Option<Cache> {
39+
let mut file = File::open(cache_path).ok()?;
40+
let mut data = vec![];
41+
file.read_to_end(&mut data).ok()?;
42+
serde_json::from_slice(&data).ok()
43+
}
44+
45+
/// Write a cache file to disk.
46+
fn write_cache_to_file(cache_path: &str, cache: &Cache) -> Result<(), Box<dyn std::error::Error>> {
47+
let serialized = serde_json::to_vec_pretty(cache)?;
48+
let mut file = OpenOptions::new()
49+
.write(true)
50+
.create(true)
51+
.truncate(true)
52+
.open(cache_path)?;
53+
file.write_all(&serialized)?;
54+
Ok(())
55+
}
56+
57+
/// This function will:
58+
/// 1. Read from (or initialize) a cache
59+
/// 2. For each `(address, event_signature)` in `events_to_query`, build a filter
60+
/// 3. Fetch logs in chunks of `block_range`
61+
/// 4. Store each log's topics and data in the cache
62+
/// 5. Write updates back to the cache
63+
///
64+
/// Returns the final vector of logs once all blocks have been processed.
65+
pub(crate) async fn get_logs_for_events(
66+
block_to_start_with: u64,
67+
existing_cache_path: &str,
68+
rpc_url: &str,
69+
block_range: u64,
70+
events_to_query: &[(Address, &str, Option<H256>)], // (contract address, event signature, topic1)
71+
) -> Vec<QueriedLog> {
72+
// ---------------------------------------------------------
73+
// 1. Read or initialize the cache
74+
// ---------------------------------------------------------
75+
let mut cache = read_cache_from_file(existing_cache_path).unwrap_or_else(|| Cache {
76+
first_seen_block: block_to_start_with,
77+
last_seen_block: block_to_start_with,
78+
all_logs: vec![],
79+
});
80+
81+
// If the cache file was found, check the condition about `first_seen_block`
82+
if cache.first_seen_block > block_to_start_with {
83+
// If the cache's first_seen_block is larger than our new start,
84+
// clear the entire cache and reset.
85+
cache.first_seen_block = block_to_start_with;
86+
cache.last_seen_block = block_to_start_with;
87+
cache.all_logs.clear();
88+
}
89+
90+
// ---------------------------------------------------------
91+
// 2. Connect to a provider
92+
// ---------------------------------------------------------
93+
let provider =
94+
Provider::<Http>::try_from(rpc_url).expect("Could not instantiate HTTP Provider");
95+
96+
// Get the latest block so we know how far we can go
97+
let latest_block = provider
98+
.get_block_number()
99+
.await
100+
.expect("Failed to fetch latest block")
101+
.as_u64();
102+
103+
// Our actual starting point is whichever is further along
104+
let mut current_block = cache.last_seen_block;
105+
106+
// ---------------------------------------------------------
107+
// 3. Process logs in chunks of block_range
108+
// ---------------------------------------------------------
109+
while current_block <= latest_block {
110+
let start_of_range = current_block;
111+
let end_of_range = std::cmp::min(start_of_range + block_range, latest_block);
112+
113+
println!("Processing range {start_of_range} - {end_of_range}\n");
114+
115+
// If the entire range is below what we have already processed, skip
116+
if end_of_range < cache.last_seen_block {
117+
// skip range
118+
current_block = end_of_range + 1;
119+
println!("Range is cached, skipping...");
120+
continue;
121+
}
122+
123+
// We'll collect all logs from all event filters in this chunk
124+
let mut new_logs_for_range = Vec::new();
125+
126+
// ---------------------------------------------------------
127+
// 4. Build filters for each event signature and fetch logs
128+
// ---------------------------------------------------------
129+
for (contract_address, event_sig, topic_1) in events_to_query.iter() {
130+
// Example usage with ethers-rs: Filter::new().event(event_sig)
131+
// If your event signature is an "event Foo(address,uint256)" string,
132+
// ethers-rs will do the topic0 hashing automatically.
133+
// Alternatively, you can manually set .topics(Some(vec![event_sig_hash]), None, None, None).
134+
let mut filter = Filter::new()
135+
.address(*contract_address)
136+
.event(event_sig)
137+
.from_block(start_of_range)
138+
.to_block(end_of_range);
139+
140+
if let Some(x) = topic_1 {
141+
filter = filter.topic1(*x);
142+
}
143+
144+
// Sleep for 1 second before each JSON-RPC request to avoid hitting rate limits
145+
sleep(Duration::from_secs(1)).await;
146+
147+
let logs = match provider.get_logs(&filter).await {
148+
Ok(ls) => ls,
149+
Err(e) => {
150+
eprintln!(
151+
"Failed to fetch logs for event signature {event_sig} at {contract_address:?}: {e}"
152+
);
153+
continue;
154+
}
155+
};
156+
157+
// Store each log's topics + data
158+
for log in logs {
159+
new_logs_for_range.push(QueriedLog {
160+
block_number: log.block_number.map(|bn| bn.as_u64()),
161+
transaction_hash: log.transaction_hash,
162+
address: log.address,
163+
topics: log.topics,
164+
data: log.data.to_vec(),
165+
});
166+
}
167+
}
168+
169+
// ---------------------------------------------------------
170+
// 5. Update the cache, flush it to disk
171+
// ---------------------------------------------------------
172+
cache.last_seen_block = end_of_range;
173+
cache.all_logs.extend(new_logs_for_range);
174+
175+
write_cache_to_file(existing_cache_path, &cache).expect("Failed to write cache to file");
176+
177+
println!("Processed and saved the range!");
178+
179+
// Move our current_block pointer forward
180+
if end_of_range == latest_block {
181+
break;
182+
} else {
183+
current_block = end_of_range + 1;
184+
}
185+
}
186+
187+
// Return the logs we have in the cache. If you only want the new logs
188+
// from this run, you could track them differently. But for simplicity,
189+
// we return everything in the cache.
190+
cache.all_logs
191+
}

crates/zkstack/src/commands/dev/commands/gateway.rs

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@ use zkstack_cli_config::{
1818
};
1919
use zksync_contracts::{chain_admin_contract, hyperchain_contract, DIAMOND_CUT};
2020
use zksync_types::{
21-
ethabi,
21+
address_to_h256, ethabi, h256_to_address,
2222
url::SensitiveUrl,
2323
web3::{keccak256, Bytes},
24-
Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, H256,
25-
L2_NATIVE_TOKEN_VAULT_ADDRESS, U256,
24+
Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, CONTRACT_DEPLOYER_ADDRESS,
25+
H256, L2_NATIVE_TOKEN_VAULT_ADDRESS, U256,
2626
};
2727
use zksync_web3_decl::{
2828
client::{Client, DynClient, L2},
2929
namespaces::{EthNamespaceClient, UnstableNamespaceClient, ZksNamespaceClient},
3030
};
3131

32+
use super::events_gatherer::{get_logs_for_events, DEFAULT_BLOCK_RANGE};
33+
3234
/// To support both functionality of assignment inside local tests
3335
/// and to print out the changes to the user the following function is used.
3436
#[macro_export]
@@ -183,32 +185,43 @@ pub(crate) async fn check_l2_ntv_existence(l2_client: &Box<DynClient<L2>>) -> an
183185
Ok(())
184186
}
185187

186-
pub async fn get_all_tokens(
187-
l2_client: &Box<DynClient<L2>>,
188-
) -> anyhow::Result<Vec<zksync_web3_decl::types::Token>> {
189-
const LIMIT_PER_QUERY: u8 = 255;
190-
191-
let mut result = vec![];
192-
let mut offset = 0;
193-
194-
loop {
195-
let tokens = l2_client
196-
.get_confirmed_tokens(offset, LIMIT_PER_QUERY)
197-
.await?;
198-
199-
if tokens.is_empty() {
200-
break;
201-
}
202-
203-
result.extend(tokens.into_iter());
204-
offset += LIMIT_PER_QUERY as u32;
205-
}
188+
const L2_TOKENS_CACHE: &'static str = "l2-tokens-cache.json";
189+
const CONTRACT_DEPLOYED_EVENT: &'static str = "ContractDeployed(address,bytes32,address)";
190+
191+
/// Returns a list of tokens that can be deployed via the L2 legacy shared bridge.
192+
/// Note that it is a *superset* of all bridged tokens. Some of the deployed contracts
193+
/// are not tokens. The caller will have to double check for each individual token that it is correct.
194+
pub async fn get_deployed_by_bridge(
195+
l2_rpc_url: &str,
196+
l2_shared_bridge_address: Address,
197+
block_range: u64,
198+
) -> anyhow::Result<Vec<Address>> {
199+
println!(
200+
"Retrieving L2 bridged tokens... If done for the first time, it may take a few minutes"
201+
);
202+
// Each legacy bridged token is deployed via the legacy shared bridge.
203+
let total_logs_for_bridged_tokens = get_logs_for_events(
204+
0,
205+
&L2_TOKENS_CACHE,
206+
l2_rpc_url,
207+
block_range,
208+
&[(
209+
CONTRACT_DEPLOYER_ADDRESS,
210+
CONTRACT_DEPLOYED_EVENT,
211+
Some(address_to_h256(&l2_shared_bridge_address)),
212+
)],
213+
)
214+
.await;
215+
println!("Done!");
206216

207-
Ok(result)
217+
Ok(total_logs_for_bridged_tokens
218+
.into_iter()
219+
.map(|log| h256_to_address(&log.topics[3]))
220+
.collect())
208221
}
209222

210-
pub(crate) fn get_ethers_provider(url: String) -> anyhow::Result<Arc<Provider<Http>>> {
211-
let provider = match Provider::<Http>::try_from(&url) {
223+
pub(crate) fn get_ethers_provider(url: &str) -> anyhow::Result<Arc<Provider<Http>>> {
224+
let provider = match Provider::<Http>::try_from(url) {
212225
Ok(provider) => provider,
213226
Err(err) => {
214227
anyhow::bail!("Connection error: {:#?}", err);
@@ -230,14 +243,16 @@ pub(crate) fn get_zk_client(url: &str, l2_chain_id: u64) -> anyhow::Result<Box<D
230243
Ok(l2_client)
231244
}
232245

233-
pub async fn check_token_readiness(l2_rpc_url: String, l2_chain_id: u64) -> anyhow::Result<()> {
246+
pub async fn check_token_readiness(
247+
l2_rpc_url: String,
248+
l2_chain_id: u64,
249+
l2_tokens_indexing_block_range: Option<u64>,
250+
) -> anyhow::Result<()> {
234251
let l2_client = get_zk_client(&l2_rpc_url, l2_chain_id)?;
235252

236253
check_l2_ntv_existence(&l2_client).await?;
237254

238-
let provider = get_ethers_provider(l2_rpc_url)?;
239-
240-
let all_tokens = get_all_tokens(&l2_client).await?;
255+
let provider = get_ethers_provider(&l2_rpc_url)?;
241256

242257
let l2_native_token_vault =
243258
L2NativeTokenVaultAbi::new(L2_NATIVE_TOKEN_VAULT_ADDRESS, provider.clone());
@@ -247,18 +262,23 @@ pub async fn check_token_readiness(l2_rpc_url: String, l2_chain_id: u64) -> anyh
247262
return Ok(());
248263
}
249264

265+
let all_tokens = get_deployed_by_bridge(
266+
&l2_rpc_url,
267+
l2_legacy_shared_bridge_addr,
268+
l2_tokens_indexing_block_range.unwrap_or(DEFAULT_BLOCK_RANGE),
269+
)
270+
.await?;
271+
250272
let l2_legacy_shared_bridge =
251273
L2LegacySharedBridgeAbi::new(l2_legacy_shared_bridge_addr, provider);
252274

253275
for token in all_tokens {
254-
let current_asset_id = l2_native_token_vault.asset_id(token.l2_address).await?;
255-
// Let's double check whether the token can be registered at all
256-
let l1_address = l2_legacy_shared_bridge
257-
.l_1_token_address(token.l2_address)
258-
.await?;
276+
let current_asset_id = l2_native_token_vault.asset_id(token).await?;
277+
// Let's double check whether the token is a valid legacy token
278+
let l1_address = l2_legacy_shared_bridge.l_1_token_address(token).await?;
259279

260280
if current_asset_id == [0u8; 32] && l1_address != Address::zero() {
261-
anyhow::bail!("There are unregistered L2 tokens! (E.g. {} (address {:#?})). Please register them to smoother migration for your users.", token.name, token.l2_address)
281+
anyhow::bail!("There are unregistered L2 tokens! (E.g. {:#?}). Please register them to smoother migration for your users.", token)
262282
}
263283
}
264284

@@ -269,8 +289,14 @@ pub async fn check_chain_readiness(
269289
l1_rpc_url: String,
270290
l2_rpc_url: String,
271291
l2_chain_id: u64,
292+
l2_tokens_indexing_block_range: Option<u64>,
272293
) -> anyhow::Result<()> {
273-
check_token_readiness(l2_rpc_url.clone(), l2_chain_id).await?;
294+
check_token_readiness(
295+
l2_rpc_url.clone(),
296+
l2_chain_id,
297+
l2_tokens_indexing_block_range,
298+
)
299+
.await?;
274300

275301
let l1_provider = match Provider::<Http>::try_from(&l1_rpc_url) {
276302
Ok(provider) => provider,
@@ -825,6 +851,7 @@ pub struct GatewayUpgradeCalldataArgs {
825851
dangerous_no_cross_check: Option<bool>,
826852
#[clap(long, default_missing_value = "false")]
827853
force_display_finalization_params: Option<bool>,
854+
l2_tokens_indexing_block_range: Option<u64>,
828855
}
829856

830857
pub struct GatewayUpgradeArgsInner {
@@ -920,6 +947,7 @@ pub(crate) async fn run(shell: &Shell, args: GatewayUpgradeCalldataArgs) -> anyh
920947
args.l1_rpc_url.clone(),
921948
args.l2_rpc_url.clone(),
922949
args.chain_id,
950+
args.l2_tokens_indexing_block_range,
923951
)
924952
.await;
925953

0 commit comments

Comments
 (0)