Skip to content

Commit

Permalink
add log better & add method to jsonrpc result
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhuachuang committed May 17, 2020
1 parent b6bb5ff commit 03222ec
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ toml = "0.5"
sha3 = "0.8"
rand = "0.7"
dirs = "2"
log = "0.4"

# JSON-RPC
# tide = "0.5"
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,6 @@ impl RawConfig {

async fn load_file_string(mut path: PathBuf) -> Result<String> {
path.push(CONFIG_FILE_NAME);
debug!("DEBUG-TDN: config file: {:?}", path);
read_string_absolute_file(&path).await
}
16 changes: 8 additions & 8 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn run_receiver(
select! {
msg = out_recv.recv().fuse() => match msg {
Some(msg) => {
println!("DEBUG: recv from outside: {:?}", msg);
debug!("DEBUG: recv from outside: {:?}", msg);
match msg {
LayerSendMessage::Upper(gid, data) => {
uppers.get(&gid).map(|h| {
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn run_receiver(
Some(msg) => {
match msg {
StreamMessage::Open(gid, remote_gid, uid, addr, sender, is_upper) => {
println!("DEBUG: layer: {}, uid: {} open ok!", remote_gid.short_show(), uid);
debug!("DEBUG: layer: {}, uid: {} open ok!", remote_gid.short_show(), uid);
if !layers.contains_key(&gid) {
continue;
}
Expand Down Expand Up @@ -350,14 +350,14 @@ async fn process_stream(
server_send: Sender<StreamMessage>,
) -> Result<()> {
let is_upper = has_remote_public.is_some();
println!("DEBUG: start process stream");
debug!("DEBUG: start process stream");
let addr = stream.peer_addr()?;
let (mut reader, mut writer) = &mut (&stream, &stream);

// if is to upper, send self-info first.
if is_upper {
let remote_public_bytes = has_remote_public.unwrap().to_bytes();
println!("DEBUG: send remote by self");
debug!("DEBUG: send remote by self");
let len = remote_public_bytes.len() as u32;
writer.write(&(len.to_be_bytes())).await?;
writer.write_all(&remote_public_bytes[..]).await?;
Expand Down Expand Up @@ -387,13 +387,13 @@ async fn process_stream(
.await;

if result.is_err() {
println!("Debug: Session timeout");
debug!("Debug: Session timeout");
return Ok(());
}

let result = result.unwrap();
if result.is_none() {
println!("Debug: Session invalid pk");
debug!("Debug: Session invalid pk");
return Ok(());
}

Expand Down Expand Up @@ -448,7 +448,7 @@ async fn process_stream(
writer.write_all(&bytes[..]).await?;
}
StreamMessage::Ok(remote_public) => {
println!("DEBUG: send remote after verify");
debug!("DEBUG: send remote after verify");
let remote_public_bytes = remote_public.to_bytes();
let len = remote_public_bytes.len() as u32;
writer.write(&(len.to_be_bytes())).await?;
Expand All @@ -462,7 +462,7 @@ async fn process_stream(
}
}

println!("DEBUG: close layers: {}", addr);
debug!("DEBUG: close layers: {}", addr);
server_send
.send(StreamMessage::Close(gid, uid, is_upper))
.await;
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#![recursion_limit = "1024"]
#![feature(associated_type_defaults)]

#[macro_use]
extern crate log;

mod config;
mod layer;
mod message;
Expand Down
10 changes: 5 additions & 5 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ pub(crate) async fn start<M: 'static + GroupMessage>(
) -> Result<(PeerId, Sender<GroupSendMessage>)> {
let (self_send, self_recv) = new_send_channel();

println!("DEBUG: P2P listening: {}", config.addr);
debug!("DEBUG: P2P listening: {}", config.addr);

// start chamomile
let (peer_id, p2p_send, p2p_recv) = p2p_start(config).await?;
println!("p2p service started");
debug!("p2p service started");

task::spawn(run_listen(out_send, p2p_send, p2p_recv, self_recv));
println!("p2p channel service started");
debug!("p2p channel service started");

Ok((peer_id, self_send))
}
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn run_listen<M: GroupMessage>(
)).await;
},
ReceiveMessage::Data(peer_addr, data) => {
println!("DEBUG: P2P Event Length: {}", data.len());
debug!("DEBUG: P2P Event Length: {}", data.len());
out_send.send(M::new_group(
GroupReceiveMessage::Event(peer_addr, data)
)).await;
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn run_listen<M: GroupMessage>(
p2p_send.send(SendMessage::DisConnect(addr)).await;
},
GroupSendMessage::Event(peer_addr, data) => {
println!("DEBUG: Outside Event Length: {}", data.len());
debug!("DEBUG: Outside Event Length: {}", data.len());
p2p_send.send(SendMessage::Data(peer_addr, data)).await;
},
GroupSendMessage::Broadcast(broadcast, data) => {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn http_connection(
raw_stream: TcpStream,
addr: SocketAddr,
) -> Result<()> {
println!("processing jsonrpc request.");
debug!("processing jsonrpc request.");
let (mut reader, mut writer) = &mut (&raw_stream, &raw_stream);

//io::timeout(Duration::from_secs(5), async { () }).await;
Expand Down
36 changes: 26 additions & 10 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,16 @@ async fn server(send: Sender<RpcMessage>, config: RpcConfig) -> Result<()> {
}

#[derive(Debug, Clone)]
pub enum RpcError {
pub enum RpcError<'a> {
ParseError,
InvalidRequest,
InvalidVersion,
InvalidResponse,
MethodNotFound(String),
MethodNotFound(&'a str),
Custom(&'a str),
}

impl RpcError {
impl<'a> RpcError<'a> {
pub fn json(&self, id: u64) -> RpcParam {
match self {
RpcError::ParseError => json!({
Expand Down Expand Up @@ -174,11 +175,21 @@ impl RpcError {
"message": "Invalid Response"
}
}),
RpcError::Custom(m) => json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32600,
"message": m
}
}),
}
}
}

fn parse_jsonrpc(json_string: String) -> std::result::Result<(RpcParam, u64), (RpcError, u64)> {
fn parse_jsonrpc<'a>(
json_string: String,
) -> std::result::Result<(RpcParam, u64), (RpcError<'a>, u64)> {
match serde_json::from_str::<RpcParam>(&json_string) {
Ok(mut value) => {
let id_res = value
Expand Down Expand Up @@ -257,11 +268,11 @@ fn parse_jsonrpc(json_string: String) -> std::result::Result<(RpcParam, u64), (R
/// ````
pub struct RpcHandler<S> {
state: Arc<S>,
fns: HashMap<String, Box<dyn Fn(Vec<RpcParam>, Arc<S>) -> RpcFut>>,
fns: HashMap<String, Box<dyn Fn(Vec<RpcParam>, Arc<S>) -> RpcFut<'static>>>,
}

type RpcResult = std::result::Result<RpcParam, RpcError>;
type RpcFut = LocalBoxFuture<'static, RpcResult>;
type RpcResult<'a> = std::result::Result<RpcParam, RpcError<'a>>;
type RpcFut<'a> = LocalBoxFuture<'static, RpcResult<'a>>;

impl<S> RpcHandler<S> {
pub fn new(state: S) -> RpcHandler<S> {
Expand All @@ -271,7 +282,7 @@ impl<S> RpcHandler<S> {
}
}

pub fn add_method<F: 'static + Fn(Vec<RpcParam>, Arc<S>) -> RpcFut>(
pub fn add_method<F: 'static + Fn(Vec<RpcParam>, Arc<S>) -> RpcFut<'static>>(
&mut self,
name: &str,
f: F,
Expand All @@ -291,12 +302,17 @@ impl<S> RpcHandler<S> {
Ok(params) => json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"result": params,
}),
Err(err) => err.json(id),
Err(err) => {
let mut res = err.json(id);
res["method"] = method.into();
res
}
}
}
None => RpcError::MethodNotFound(method.to_owned()).json(id),
None => RpcError::MethodNotFound(method).json(id),
}
} else {
RpcError::InvalidRequest.json(id)
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn ws_connection(
let ws_stream = accept_async(raw_stream)
.await
.map_err(|_e| Error::new(ErrorKind::Other, "Accept WebSocket Failure!"))?;
println!("DEBUG: WebSocket connection established: {}", addr);
debug!("DEBUG: WebSocket connection established: {}", addr);
let id: u64 = rand::thread_rng().gen();
let (s_send, mut s_recv) = rpc_channel();
send.send(RpcMessage::Open(id, s_send)).await;
Expand Down

0 comments on commit 03222ec

Please sign in to comment.