@@ -151,23 +151,24 @@ pub struct Engine {
151
151
}
152
152
153
153
struct DownloadContext < ' a , T > {
154
+ engine : & ' a Engine ,
154
155
client : Arc < dyn FullNodeOverlayClient > ,
155
- db : & ' a InternalDb ,
156
156
downloader : Arc < dyn Downloader < Item = T > > ,
157
157
id : & ' a BlockIdExt ,
158
158
limit : Option < u32 > ,
159
159
log_error_limit : u32 ,
160
160
name : & ' a str ,
161
161
timeout : Option < ( u64 , u64 , u64 ) > , // (current, multiplier*10, max)
162
- #[ cfg( feature = "telemetry" ) ]
163
- full_node_telemetry : & ' a FullNodeTelemetry ,
164
162
}
165
163
166
164
impl < T > DownloadContext < ' _ , T > {
167
165
168
166
async fn download ( & mut self ) -> Result < T > {
169
167
let mut attempt = 1 ;
170
168
loop {
169
+ if self . engine . check_stop ( ) {
170
+ fail ! ( "{} id: {}, stop flag was set" , self . name, self . id) ;
171
+ }
171
172
match self . downloader . try_download ( self ) . await {
172
173
Err ( e) => self . log ( format ! ( "{}" , e) . as_str ( ) , attempt) ,
173
174
Ok ( ret) => break Ok ( ret)
@@ -219,10 +220,10 @@ impl Downloader for BlockDownloader {
219
220
& self ,
220
221
context : & DownloadContext < ' _ , Self :: Item > ,
221
222
) -> Result < Self :: Item > {
222
- if let Some ( handle) = context. db . load_block_handle ( context. id ) ? {
223
+ if let Some ( handle) = context. engine . db . load_block_handle ( context. id ) ? {
223
224
let mut is_link = false ;
224
225
if handle. has_data ( ) && handle. has_proof_or_link ( & mut is_link) {
225
- let block = match context. db . load_block_data ( & handle) . await {
226
+ let block = match context. engine . db . load_block_data ( & handle) . await {
226
227
Err ( e) => if !handle. has_data ( ) {
227
228
None
228
229
} else {
@@ -233,7 +234,7 @@ impl Downloader for BlockDownloader {
233
234
let proof = if block. is_none ( ) {
234
235
None
235
236
} else {
236
- match context. db . load_block_proof ( & handle, is_link) . await {
237
+ match context. engine . db . load_block_proof ( & handle, is_link) . await {
237
238
Err ( e) => if is_link && !handle. has_proof_link ( ) {
238
239
None
239
240
} else if !is_link && !handle. has_proof ( ) {
@@ -252,11 +253,11 @@ impl Downloader for BlockDownloader {
252
253
}
253
254
}
254
255
#[ cfg( feature = "telemetry" ) ]
255
- context. full_node_telemetry . new_downloading_block_attempt ( context. id ) ;
256
+ context. engine . full_node_telemetry . new_downloading_block_attempt ( context. id ) ;
256
257
let ret = context. client . download_block_full ( context. id ) . await ;
257
258
#[ cfg( feature = "telemetry" ) ]
258
259
if ret. is_ok ( ) {
259
- context. full_node_telemetry . new_downloaded_block ( context. id ) ;
260
+ context. engine . full_node_telemetry . new_downloaded_block ( context. id ) ;
260
261
}
261
262
ret
262
263
}
@@ -274,10 +275,10 @@ impl Downloader for BlockProofDownloader {
274
275
& self ,
275
276
context : & DownloadContext < ' _ , Self :: Item > ,
276
277
) -> Result < Self :: Item > {
277
- if let Some ( handle) = context. db . load_block_handle ( context. id ) ? {
278
+ if let Some ( handle) = context. engine . db . load_block_handle ( context. id ) ? {
278
279
let mut is_link = false ;
279
280
if handle. has_proof_or_link ( & mut is_link) {
280
- return Ok ( context. db . load_block_proof ( & handle, is_link) . await ?) ;
281
+ return Ok ( context. engine . db . load_block_proof ( & handle, is_link) . await ?) ;
281
282
}
282
283
}
283
284
context. client . download_block_proof (
@@ -297,15 +298,15 @@ impl Downloader for NextBlockDownloader {
297
298
& self ,
298
299
context : & DownloadContext < ' _ , Self :: Item > ,
299
300
) -> Result < Self :: Item > {
300
- if let Some ( prev_handle) = context. db . load_block_handle ( context. id ) ? {
301
+ if let Some ( prev_handle) = context. engine . db . load_block_handle ( context. id ) ? {
301
302
if prev_handle. has_next1 ( ) {
302
- let next_id = context. db . load_block_next1 ( context. id ) ?;
303
- if let Some ( next_handle) = context. db . load_block_handle ( & next_id) ? {
303
+ let next_id = context. engine . db . load_block_next1 ( context. id ) ?;
304
+ if let Some ( next_handle) = context. engine . db . load_block_handle ( & next_id) ? {
304
305
let mut is_link = false ;
305
306
if next_handle. has_data ( ) && next_handle. has_proof_or_link ( & mut is_link) {
306
307
return Ok ( (
307
- context. db . load_block_data ( & next_handle) . await ?,
308
- context. db . load_block_proof ( & next_handle, is_link) . await ?
308
+ context. engine . db . load_block_data ( & next_handle) . await ?,
309
+ context. engine . db . load_block_proof ( & next_handle, is_link) . await ?
309
310
) ) ;
310
311
}
311
312
}
@@ -324,9 +325,9 @@ impl Downloader for ZeroStateDownloader {
324
325
& self ,
325
326
context : & DownloadContext < ' _ , Self :: Item > ,
326
327
) -> Result < Self :: Item > {
327
- if let Some ( handle) = context. db . load_block_handle ( context. id ) ? {
328
+ if let Some ( handle) = context. engine . db . load_block_handle ( context. id ) ? {
328
329
if handle. has_state ( ) {
329
- let zs = context. db . load_shard_state_dynamic ( context. id ) ?;
330
+ let zs = context. engine . db . load_shard_state_dynamic ( context. id ) ?;
330
331
let mut data = vec ! ( ) ;
331
332
zs. write_to ( & mut data) ?;
332
333
return Ok ( ( zs, data) ) ;
@@ -1306,15 +1307,13 @@ impl Engine {
1306
1307
id. shard ( ) . workchain_id ( ) ,
1307
1308
id. shard ( ) . shard_prefix_with_tag ( )
1308
1309
) . await ?,
1309
- db : self . db . deref ( ) ,
1310
+ engine : self ,
1310
1311
downloader,
1311
1312
id,
1312
1313
limit,
1313
1314
log_error_limit,
1314
1315
name,
1315
1316
timeout,
1316
- #[ cfg( feature = "telemetry" ) ]
1317
- full_node_telemetry : self . full_node_telemetry ( ) ,
1318
1317
} ;
1319
1318
Ok ( ret)
1320
1319
}
@@ -1546,78 +1545,70 @@ impl Engine {
1546
1545
1547
1546
async fn check_gc_for_archives (
1548
1547
engine : & Arc < Engine > ,
1549
- curr_block_handle : & Arc < BlockHandle > ,
1548
+ last_keyblock : & Arc < BlockHandle > ,
1550
1549
mc_state : & ShardStateStuff
1551
1550
) -> Result < ( ) > {
1552
- let mut prev_pss_block = None ;
1553
- let mut prev_prev_pss_block = None ;
1554
- let mut handle = curr_block_handle. clone ( ) ;
1555
- let mut check_date = std:: time:: SystemTime :: now ( ) . duration_since ( std:: time:: UNIX_EPOCH ) ?;
1556
-
1551
+ let mut gc_max_date = std:: time:: SystemTime :: now ( ) . duration_since ( std:: time:: UNIX_EPOCH ) ?;
1557
1552
match & engine. archives_life_time {
1558
1553
None => return Ok ( ( ) ) ,
1559
1554
Some ( life_time) => {
1560
- match check_date . checked_sub ( Duration :: from_secs ( ( life_time * 3600 ) as u64 ) ) {
1555
+ match gc_max_date . checked_sub ( Duration :: from_secs ( ( life_time * 3600 ) as u64 ) ) {
1561
1556
Some ( date) => {
1562
1557
log:: info!( "archive gc: checked date {}." , & date. as_secs( ) ) ;
1563
- check_date = date
1558
+ gc_max_date = date
1564
1559
} ,
1565
1560
None => {
1566
1561
log:: info!( "archive gc: life_time in config is bad, actual checked date: {}" ,
1567
- & check_date . as_secs( )
1562
+ & gc_max_date . as_secs( )
1568
1563
) ;
1569
1564
}
1570
1565
}
1571
1566
}
1572
1567
}
1573
1568
1569
+ let mut visited_pss_blocks = 0 ;
1570
+ let mut keyblock = last_keyblock. clone ( ) ;
1571
+ let prev_blocks = & mc_state. shard_state_extra ( ) ?. prev_blocks ;
1574
1572
loop {
1575
- if handle. id ( ) . seq_no ( ) == 0 {
1576
- return Ok ( ( ) ) ;
1577
- }
1578
-
1579
- if let Some ( prev_key_block_id) =
1580
- mc_state. shard_state_extra ( ) ?. prev_blocks . get_prev_key_block ( handle. id ( ) . seq_no ( ) - 1 ) ? {
1581
-
1582
- let block_id = BlockIdExt {
1583
- shard_id : ShardIdent :: masterchain ( ) ,
1584
- seq_no : prev_key_block_id. seq_no ,
1585
- root_hash : prev_key_block_id. root_hash ,
1586
- file_hash : prev_key_block_id. file_hash
1587
- } ;
1588
- let prev_handle = engine. load_block_handle ( & block_id) ?. ok_or_else (
1589
- || error ! ( "Cannot load handle for PSS keeper prev key block {}" , block_id)
1590
- ) ?;
1591
- if engine. is_persistent_state ( curr_block_handle. gen_utime ( ) ?,
1592
- prev_handle. gen_utime ( ) ?, boot:: PSS_PERIOD_BITS ) {
1593
- prev_prev_pss_block = prev_pss_block;
1594
- prev_pss_block = Some ( prev_handle. clone ( ) ) ;
1595
- }
1596
- handle = prev_handle;
1597
-
1598
- if let Some ( pss_block) = & prev_prev_pss_block {
1599
- let gen_time = pss_block. gen_utime ( ) ? as u64 ;
1600
- let check_date = check_date. as_secs ( ) ;
1601
- if gen_time < check_date {
1602
- log:: info!(
1603
- "gc for archives: found block (gen time: {}, seq_no: {}), check date: {}" ,
1604
- & gen_time, pss_block. id( ) . seq_no( ) , & check_date
1605
- ) ;
1606
- break ;
1573
+ match prev_blocks. get_prev_key_block ( keyblock. id ( ) . seq_no ( ) - 1 ) ? {
1574
+ None => return Ok ( ( ) ) ,
1575
+ Some ( prev_keyblock) => {
1576
+ let prev_keyblock = BlockIdExt :: from_ext_blk ( prev_keyblock) ;
1577
+ let prev_keyblock = engine. load_block_handle ( & prev_keyblock) ?. ok_or_else (
1578
+ || error ! ( "Cannot load handle for PSS keeper prev key block {}" , prev_keyblock)
1579
+ ) ?;
1580
+ if engine. is_persistent_state (
1581
+ keyblock. gen_utime ( ) ?, prev_keyblock. gen_utime ( ) ?, boot:: PSS_PERIOD_BITS
1582
+ ) {
1583
+ visited_pss_blocks += 1 ;
1584
+
1585
+ // Due to boot process specific (pss period and key_block_utime_step combinations)
1586
+ // we shouldn't delete last 4 pss blocks
1587
+ // ....................pss_block....pss_block....pss_block....pss_block...
1588
+ // visited_pss_blocks: 4 3 2 1
1589
+ // ↑ we may delete blocks starting at least here (before 4th pss)
1590
+ if visited_pss_blocks >= 4 {
1591
+ let gen_time = keyblock. gen_utime ( ) ? as u64 ;
1592
+ let gc_max_date = gc_max_date. as_secs ( ) ;
1593
+ if gen_time < gc_max_date {
1594
+ log:: info!(
1595
+ "gc for archives: found block (gen time: {}, seq_no: {}), gc max date: {}" ,
1596
+ & gen_time, keyblock. id( ) . seq_no( ) , & gc_max_date
1597
+ ) ;
1598
+ log:: info!( "start gc for archives.." ) ;
1599
+ engine. db . archive_manager ( ) . gc ( & keyblock. id ( ) ) . await ;
1600
+ log:: info!( "finish gc for archives." ) ;
1601
+ return Ok ( ( ) ) ;
1602
+ }
1603
+ }
1604
+ }
1605
+ if prev_keyblock. id ( ) . seq_no ( ) == 0 {
1606
+ return Ok ( ( ) ) ;
1607
1607
}
1608
+ keyblock = prev_keyblock;
1608
1609
}
1609
- } else {
1610
- return Ok ( ( ) ) ;
1611
1610
}
1612
1611
}
1613
-
1614
- if let Some ( gc_marked_block) = & prev_prev_pss_block {
1615
- log:: info!( "start gc for archives.." ) ;
1616
- engine. db . archive_manager ( ) . gc ( & gc_marked_block. id ( ) ) . await ;
1617
- log:: info!( "finish gc for archives." ) ;
1618
- }
1619
-
1620
- Ok ( ( ) )
1621
1612
}
1622
1613
1623
1614
fn check_finish_sync ( self : Arc < Self > ) {
0 commit comments