Skip to content

Commit d7b9810

Browse files
committed
update re-uploader
1 parent e19078b commit d7b9810

File tree

10 files changed

+1972
-1835
lines changed

10 files changed

+1972
-1835
lines changed

Cargo.lock

+1,819-1,818
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

icda-core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ backon = { workspace = true }
2323
rand = { workspace = true }
2424
tracing = { workspace = true }
2525
futures = { workspace = true }
26+
regex = "1.10.5"
2627

2728

2829
[features]

icda-core/src/backup.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use crate::canister_interface::storage::BlobChunk;
2+
use crate::icda::ICDA;
3+
use candid::Principal;
4+
use regex::Regex;
5+
use std::path::PathBuf;
6+
use std::sync::Arc;
7+
use tokio::fs::{File, ReadDir};
8+
use tokio::io::AsyncReadExt;
9+
10+
pub async fn cycle_monitor() {
11+
unimplemented!()
12+
}
13+
14+
const BACKUP_PATH: &str = "backup";
15+
16+
pub struct ReUploader {
17+
backup: ReadDir,
18+
icda: Arc<ICDA>,
19+
}
20+
21+
impl ReUploader {
22+
pub async fn new(icda: ICDA) -> Self {
23+
// create backup dir
24+
tokio::fs::create_dir(BACKUP_PATH)
25+
.await
26+
.expect("failed to create backup dir");
27+
28+
let backup = tokio::fs::read_dir(BACKUP_PATH)
29+
.await
30+
.expect("failed to read backup dir");
31+
32+
let icda = Arc::new(icda);
33+
34+
Self { backup, icda }
35+
}
36+
37+
// cycling monitor backup file and re-upload the failed chunks
38+
pub async fn monitor(&mut self) {
39+
loop {
40+
match self.backup.next_entry().await {
41+
Ok(entry) => match entry {
42+
Some(entry) => {
43+
tracing::warn!("ICDA ReUploader: monitor: catch file {:?}", entry.path());
44+
let path = entry.path();
45+
let fut = ReUploader::reupload(self.icda.clone(), path);
46+
tokio::spawn(fut);
47+
}
48+
None => {
49+
tracing::info!("ICDA ReUploader: monitor: no files to reupload");
50+
}
51+
},
52+
Err(e) => {
53+
tracing::error!(
54+
"ICDA ReUploader: monitor: failed to read backup dir: {:?}",
55+
e
56+
);
57+
}
58+
}
59+
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
60+
}
61+
}
62+
63+
//todo: 可以不用每次都传path, 以及将这个模块放到icda里面
64+
// 以及将对chunk的deserialize以及命名放到这个模块
65+
async fn reupload(icda: Arc<ICDA>, path: PathBuf) {
66+
let mut file = File::open(&path).await.expect("failed to open file");
67+
let mut buffer = Vec::new();
68+
file.read_to_end(&mut buffer)
69+
.await
70+
.expect("failed to read file");
71+
72+
// file's name is : chunk-{canister_id}-{chunk.index}.bin
73+
let canister_id = parse_canister_id_from_file_name(path);
74+
let sc = icda
75+
.storage_canisters_map
76+
.get(&canister_id)
77+
.expect("failed to get canister")
78+
.clone();
79+
80+
let chunk: BlobChunk = bincode::deserialize(&buffer).expect("failed to deserialize");
81+
82+
loop {
83+
match sc.save_blob(&chunk).await {
84+
Ok(_) => {
85+
tracing::info!(
86+
"ICDA ReUploader: reupload success, canister id: {}, chunk index: {}",
87+
canister_id.to_text(),
88+
chunk.index
89+
);
90+
break;
91+
}
92+
Err(e) => {
93+
tracing::error!(
94+
"ICDA ReUploader: reupload failed: canister id: {}, chunk index: {}. error: {:?}, retry after 60s",
95+
canister_id.to_text(),
96+
chunk.index,
97+
e
98+
);
99+
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
100+
}
101+
}
102+
}
103+
}
104+
}
105+
106+
fn parse_canister_id_from_file_name(path: PathBuf) -> Principal {
107+
let re = Regex::new(r"chunk_(\d+)_\d+.bin").expect("failed to compile regex");
108+
let canister_id = re
109+
.captures(
110+
path.file_name()
111+
.expect("failed to get file name")
112+
.to_str()
113+
.unwrap(),
114+
)
115+
.expect("failed to get canister id")
116+
.get(1)
117+
.expect("failed to get canister id")
118+
.as_str();
119+
Principal::from_text(canister_id).expect("failed to parse canister id")
120+
}

icda-core/src/canister_interface/storage.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,16 @@ impl StorageCanister {
146146
}
147147

148148
pub async fn save_blob(&self, chunk: &BlobChunk) -> anyhow::Result<()> {
149-
let arg = Encode!(&chunk)?;
149+
let arg = Encode!(chunk)?;
150150
let raw_response = self
151151
.agent
152152
.update_call(&self.canister_id, "save_blob", arg)
153153
.await?;
154154
let response = Decode!(&raw_response, Result<(), String>)?;
155155
if let Err(e) = response {
156-
bail!("failed to save blob: {}", e)
156+
bail!("storage canister: save blob: failed to save blob: {}", e)
157157
}
158+
158159
Ok(())
159160
}
160161

icda-core/src/icda.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use crate::backup::ReUploader;
12
use crate::canister_interface::rr_agent::RoundRobinAgent;
23
use crate::canister_interface::signature::{ConfirmationStatus, SignatureCanister};
34
use crate::canister_interface::storage::{BlobChunk, RoutingInfo, StorageCanister};
45
use anyhow::bail;
56
use anyhow::Result;
67
use candid::{Deserialize, Principal};
8+
use futures::executor::block_on;
79
use ic_agent::identity::BasicIdentity;
810
use rand::random;
911
use serde::Serialize;
@@ -44,6 +46,8 @@ pub const CANISTER_COLLECTIONS: [[&str; REPLICA_NUM]; COLLECTION_SIZE] = [
4446
["r2xtu-uiaaa-aaaag-alf6q-cai"], // lspz2-jx4pu-k3e7p-znm7j-q4yum-ork6e-6w4q6-pijwq-znehu-4jabe-kqe
4547
];
4648

49+
const RETRY_TIMES: usize = 3;
50+
4751
#[derive(Serialize, Deserialize, Clone, Default)]
4852
pub struct BlobKey {
4953
pub digest: [u8; 32],
@@ -95,23 +99,34 @@ impl ICDA {
9599
info!("ICDA::new(): signature canister init success");
96100
}
97101
Err(e) => {
98-
warn!("ICDA::new(): signature canister init failed, error: {:?}, retry after 5 seconds",e);
99-
tokio::time::sleep(Duration::from_secs(5)).await;
102+
bail!(
103+
"ICDA::new(): signature canister init failed, error: {:?}",
104+
e
105+
);
100106
}
101107
}
102108
}
103109
}
104110

105111
let canister_collection_index = Arc::new(Mutex::new(random::<usize>() % COLLECTION_SIZE));
106112

107-
// create backup dir
108-
let _ = tokio::fs::create_dir("backup").await;
109-
110-
Ok(Self {
113+
let _self = Self {
111114
canister_collection_index,
112115
storage_canisters_map,
113116
signature_canister,
114-
})
117+
};
118+
119+
// create backup thread
120+
let _icda = _self.clone();
121+
let backup_thread = async move {
122+
let mut reuploader = ReUploader::new(_icda).await;
123+
reuploader.monitor().await;
124+
};
125+
tokio::task::spawn_blocking(move || {
126+
block_on(backup_thread);
127+
});
128+
129+
Ok(_self)
115130
}
116131

117132
pub async fn push_blob_to_canisters(&self, blob: Vec<u8>) -> Result<BlobKey> {
@@ -299,13 +314,13 @@ impl ICDA {
299314

300315
impl ICDA {
301316
// push chunks to a single canister
302-
async fn push_chunks_to_canister(
317+
pub(crate) async fn push_chunks_to_canister(
303318
sc: StorageCanister,
304319
chunks: Arc<Vec<BlobChunk>>,
305320
) -> Result<()> {
306321
for chunk in chunks.iter() {
307322
// simple re-upload
308-
for i in 0..3 {
323+
for i in 0..RETRY_TIMES {
309324
if let Err(e) = sc.save_blob(chunk).await {
310325
warn!(
311326
"ICDA::save_blob_chunk(): cid: {}, error: {:?}, retry after 5 seconds",
@@ -314,7 +329,9 @@ impl ICDA {
314329
);
315330
if i == 2 {
316331
// save chunks into local storage
317-
let serialized = bincode::serialize(&chunk).unwrap();
332+
let serialized = bincode::serialize(chunk).unwrap();
333+
334+
// 放到icda 的 reuploader中
318335
let _ = tokio::fs::write(
319336
format!(
320337
"backup/chunk_{}_{}.bin",

icda-core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
mod backup;
12
pub mod canister_interface;
23
pub mod icda;

server/src/canister_service.rs

-3
This file was deleted.

server/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
pub mod canister_service;
21
pub mod storage;
32
pub mod disperser {
43
#![allow(clippy::all)]

server/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ async fn main() -> Result<()> {
231231

232232
// check if cmd is IC
233233
if is_icda {
234-
tokio::time::sleep(Duration::from_secs(10)).await
234+
tokio::time::sleep(Duration::from_secs(20)).await
235235
}
236236

237237
let _ret = storage.get_blob(id).await.unwrap();

test/server-monitor.png

428 KB
Loading

0 commit comments

Comments
 (0)