Skip to content

Commit

Permalink
add basic transport for layer
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhuachuang committed Jan 8, 2020
1 parent 5b34e28 commit 1c9a41a
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 42 deletions.
7 changes: 6 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ p2p_bootstrap = []
## Layer define if others can use it as upper layer. if false, only whitelist can use. default is true.
#layer_lower = true

## Layer define upper layer's seed IPs.
## Layer define upper layer's seeds (IP, GroupId).
## Example: layer_upper = [
## { addr = "0.0.0.0:7000", group_id = "0000000000000000000000000000000000000000000000000000000000000000" },
## { addr = "0.0.0.0:7001", group_id = "1111111111111111111111111111111111111111111111111111111111111111" },
## ]
layer_upper = []


## Layer Whitelist(IP), uncomment below to change.
## Example: layer_white_list = ["1.1.1.1:7000", "192.168.0.1.:7000"]
#layer_white_list = []
Expand Down
15 changes: 11 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ impl Config {
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct RawUpper {
addr: SocketAddr,
group_id: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct RawConfig {
pub p2p_addr: Option<SocketAddr>,
Expand All @@ -145,7 +151,7 @@ pub struct RawConfig {

pub layer_addr: Option<SocketAddr>,
pub layer_lower: Option<bool>,
pub layer_upper: Option<Vec<(SocketAddr, String)>>,
pub layer_upper: Option<Vec<RawUpper>>,
pub layer_white_list: Option<Vec<SocketAddr>>,
pub layer_black_list: Option<Vec<SocketAddr>>,
pub layer_white_group_list: Option<Vec<String>>,
Expand Down Expand Up @@ -204,10 +210,11 @@ impl RawConfig {
.layer_upper
.map(|ss| {
ss.iter()
.map(|(a, s)| {
.map(|RawUpper { addr, group_id }| {
(
*a,
GroupId::from_hex(s).expect("invalid group id in layer upper"),
*addr,
GroupId::from_hex(group_id)
.expect("invalid group id in layer upper"),
)
})
.collect()
Expand Down
196 changes: 160 additions & 36 deletions src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use async_std::{
io,
io::Result,
net::{TcpListener, TcpStream},
prelude::*,
sync::{channel, Receiver, Sender},
task,
};
use futures::{select, FutureExt};
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::Duration;

use crate::group::GroupId;
use crate::primitive::MAX_MESSAGE_CAPACITY;
Expand All @@ -34,29 +37,49 @@ impl LayerConfig {
}
}

pub(crate) async fn start(config: LayerConfig, send: Sender<Message>) -> Result<Sender<Message>> {
pub(crate) async fn start(
gid: GroupId,
config: LayerConfig,
send: Sender<Message>,
) -> Result<Sender<Message>> {
let (out_send, out_recv) = new_channel();
let (self_send, self_recv) = channel::<StreamMessage>(MAX_MESSAGE_CAPACITY);

if config.is_close() {
return Ok(out_send);
}

let remote_public = RemotePublic(gid, vec![]);

let listener = TcpListener::bind(config.addr).await?;
task::spawn(run_listener(listener, send.clone(), self_send.clone()));
task::spawn(run_receiver(config, out_recv, send, self_send, self_recv));
task::spawn(run_listener(
remote_public.clone(),
listener,
send.clone(),
self_send.clone(),
));
task::spawn(run_receiver(
remote_public,
config,
out_recv,
send,
self_send,
self_recv,
));

Ok(out_send)
}

async fn run_listener(
remote_public: RemotePublic,
listener: TcpListener,
send: Sender<Message>,
self_send: Sender<StreamMessage>,
) -> Result<()> {
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
task::spawn(process_stream(
remote_public.clone(),
stream?,
send.clone(),
self_send.clone(),
Expand All @@ -70,21 +93,25 @@ async fn run_listener(
}

async fn run_client(
remote_public: RemotePublic,
addr: SocketAddr,
bytes: Vec<u8>,
send: Sender<Message>,
self_send: Sender<StreamMessage>,
) -> Result<()> {
let mut stream = TcpStream::connect(addr).await?;
let len = bytes.len() as u32;
stream.write(&(len.to_be_bytes())).await?;
stream.write_all(&bytes[..]).await?;
task::spawn(process_stream(stream, send, self_send, true));
let stream = TcpStream::connect(addr).await?;
task::spawn(process_stream(
remote_public.clone(),
stream,
send,
self_send,
true,
));

Ok(())
}

async fn run_receiver(
remote_public: RemotePublic,
config: LayerConfig,
mut out_recv: Receiver<Message>,
send: Sender<Message>,
Expand All @@ -97,8 +124,12 @@ async fn run_receiver(
// link to uppers
for (addr, gid) in config.upper {
uppers.insert(gid, HashMap::new());
// TODO link and join bytes.
task::spawn(run_client(addr, vec![], send.clone(), self_send.clone()));
task::spawn(run_client(
remote_public.clone(),
addr,
send.clone(),
self_send.clone(),
));
}

loop {
Expand All @@ -107,7 +138,9 @@ async fn run_receiver(
Some(msg) => {
println!("recv from outside: {:?}", msg);
match msg {
Message::Upper(gid, data) => {}
Message::Upper(gid, data) => {

}
Message::Lower(gid, data) => {}
_ => {}
}
Expand All @@ -117,8 +150,37 @@ async fn run_receiver(
msg = self_recv.next().fuse() => match msg {
Some(msg) => {
match msg {
StreamMessage::Open(gid, uid, sender) => {},
StreamMessage::Close(gid, uid) => {},
StreamMessage::Open(gid, uid, sender, is_upper) => {
let entry = if is_upper {
&mut uppers
} else {
&mut lowers
};

entry.entry(gid).and_modify(|h| {
h.insert(uid, sender.clone());
}).or_insert({
let mut h = HashMap::new();
h.insert(uid, sender);
h
});

println!("layer: {}, uid: {} open ok!", gid.short_show(), uid);
},
StreamMessage::Close(gid, uid, is_upper) => {
let entry = if is_upper {
&mut uppers
} else {
&mut lowers
};

entry.get_mut(&gid).map(|h| {
h.remove(&uid);
// TODO new link to this entry
});

println!("layer: {}, uid: {} closed!", gid.short_show(), uid);
},
_ => {}
}
}
Expand All @@ -135,31 +197,76 @@ async fn run_receiver(
Ok(())
}

enum StreamMessage {
Open(GroupId, u32, Sender<StreamMessage>),
Close(GroupId, u32),
Data(Vec<u8>),
}

async fn process_stream(
remote_public: RemotePublic,
stream: TcpStream,
sender: Sender<Message>,
server_send: Sender<StreamMessage>,
is_upper: bool,
) -> Result<()> {
println!("DEBUG: start process stream");
let addr = stream.peer_addr()?;

let (mut reader, mut writer) = &mut (&stream, &stream);

let (self_send, self_recv) = channel::<StreamMessage>(MAX_MESSAGE_CAPACITY);
let uid = rand::random::<u32>();
let remote_public_bytes = remote_public.to_bytes();
if is_upper {
println!("DEBUG: send remote by self");
let len = remote_public_bytes.len() as u32;
writer.write(&(len.to_be_bytes())).await?;
writer.write_all(&remote_public_bytes[..]).await?;
}

// timeout 10s to read peer_id & public_key
let result: Result<Option<RemotePublic>> = io::timeout(Duration::from_secs(5), async {
let mut read_len = [0u8; 4];
while let Ok(size) = reader.read(&mut read_len).await {
if size == 0 {
// when close or better when many Ok(0)
break;
}

let len: usize = u32::from_be_bytes(read_len) as usize;
let mut read_bytes = vec![0u8; len];
while let Ok(bytes_size) = reader.read(&mut read_bytes).await {
if bytes_size != len {
break;
}

return Ok(RemotePublic::from_bytes(read_bytes).ok());
}
}
Ok(None)
})
.await;

// TODO read gid
if result.is_err() {
println!("Debug: Session timeout");
return Ok(());
}

let gid = GroupId::default();
let result = result.unwrap();
if result.is_none() {
println!("Debug: Session invalid pk");
return Ok(());
}

let RemotePublic(gid, _bytes) = result.unwrap();

// TODO verify upper/lower.

// if verify ok, send self public info.
if !is_upper {
println!("DEBUG: send remote after verify");
let len = remote_public_bytes.len() as u32;
writer.write(&(len.to_be_bytes())).await?;
writer.write_all(&remote_public_bytes[..]).await?;
}

let (self_send, mut self_recv) = channel::<StreamMessage>(MAX_MESSAGE_CAPACITY);
let uid = rand::random::<u32>();

server_send
.send(StreamMessage::Open(gid, uid, self_send))
.send(StreamMessage::Open(gid, uid, self_send, is_upper))
.await;

let mut read_len = [0u8; 4];
Expand All @@ -169,8 +276,6 @@ async fn process_stream(
msg = reader.read(&mut read_len).fuse() => match msg {
Ok(size) => {
if size == 0 {
// when close or better when many Ok(0)
server_send.send(StreamMessage::Close(gid, uid)).await;
break;
}

Expand All @@ -186,26 +291,21 @@ async fn process_stream(
} else {
Message::Lower(gid, read_bytes.clone())
};

sender.send(message).await;
break;
}
read_len = [0u8; 4];
}
Err(e) => {
server_send.send(StreamMessage::Close(gid, uid)).await;
break;
}
Err(_e) => break,
},
msg = self_recv.recv().fuse() => match msg {
msg = self_recv.next().fuse() => match msg {
Some(msg) => {
match msg {
StreamMessage::Data(bytes) => {
let len = bytes.len() as u32;
writer.write(&(len.to_be_bytes())).await?;
writer.write_all(&bytes[..]).await?;
}
StreamMessage::Close(_, _) => break,
_ => break,
}
},
Expand All @@ -214,7 +314,31 @@ async fn process_stream(
}
}

println!("close stream: {}", addr);
println!("close layers: {}", addr);
server_send
.send(StreamMessage::Close(gid, uid, is_upper))
.await;

Ok(())
}

#[derive(Debug)]
enum StreamMessage {
Open(GroupId, u32, Sender<StreamMessage>, bool),
Close(GroupId, u32, bool),
Data(Vec<u8>),
}

// Rtemote Public Info, include local transport and public key bytes.
#[derive(Deserialize, Serialize, Clone)]
pub struct RemotePublic(pub GroupId, pub Vec<u8>);

impl RemotePublic {
pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, ()> {
bincode::deserialize(&bytes).map_err(|_e| ())
}

pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).unwrap()
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn start_main(
// start inner json_rpc
let (p2p_sender_result, layer_sender_result, rpc_sender_result) = join!(
p2p_start(gid, p2p_config, out_send.clone()),
layer_start(layer_config, out_send.clone()),
layer_start(gid, layer_config, out_send.clone()),
rpc_start(rpc_config, out_send)
);
let (p2p_sender, layer_sender, rpc_sender) =
Expand Down

0 comments on commit 1c9a41a

Please sign in to comment.