Skip to content

Commit

Permalink
socks5包交由服务端处理
Browse files Browse the repository at this point in the history
  • Loading branch information
editso committed Dec 7, 2021
1 parent 00c654b commit f61ab8b
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 235 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ opt-level = 'z'
codegen-units = 1
panic = 'abort'
debug = false
rpath = true
split-debuginfo = '...'

[workspace]
members = [
Expand All @@ -30,12 +28,13 @@ name = "fuc"
path = "src/client.rs"

[[bin]]
name = "no-log-fuc"
name = "fuc-no-log"
path = "src/no-log-client.rs"


[dependencies]
log = {version = "0.4.14"}
clap = "3.0.0-beta.5"
clap = {version = "3.0.0-beta.5", features = ["yaml"]}
smol = {version = "1.2.5"}
env_logger = "0.9.0"
fuso-core = {path = "./fuso-core"}
Expand Down
2 changes: 1 addition & 1 deletion fuso-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ where
async fn forward(self, to: To) -> Result<()> {
let (reader_s, writer_s) = self.split();
let (reader_t, writer_t) = to.split();

smol::future::race(
smol::io::copy(reader_s, writer_t),
smol::io::copy(reader_t, writer_s),
Expand Down
1 change: 0 additions & 1 deletion fuso-api/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct SafeStream<Inner> {
inner: Rollback<Inner, Buffer<u8>>,
}


pub trait SafeStreamEx<T> {
fn as_safe_stream(self) -> SafeStream<T>;
}
Expand Down
1 change: 1 addition & 0 deletions fuso-api/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use smol::{
net::{AsyncToSocketAddrs, UdpSocket},
Task,
};

use std::sync::Mutex;

use crate::{Buffer, Spwan};
Expand Down
29 changes: 25 additions & 4 deletions fuso-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use smol::{
};

use crate::retain::Heartbeat;
use crate::{bridge::Bridge, packet::Action};
use crate::{
bridge::Bridge,
packet::{Action, Addr},
};

#[allow(unused)]
#[derive(Debug)]
Expand Down Expand Up @@ -118,7 +121,6 @@ impl Fuso {
let action: Result<Action> = packet.try_into();
match action {
Ok(Action::Ping) => {}

Ok(action) => {
match accept_tx
.send(Reactor {
Expand Down Expand Up @@ -158,12 +160,31 @@ impl Fuso {

impl Reactor {
#[inline]
pub async fn join(self) -> Result<TcpStream> {
pub async fn join(self) -> Result<(TcpStream, TcpStream)> {
let to = TcpStream::connect({
match self.action {
Action::Forward(Addr::Domain(domain, port)) => {
log::info!("connect {}:{}", domain, port);
format!("{}:{}", domain, port)
}
Action::Forward(Addr::Socket(addr)) => {
log::info!("connect {}", addr);
addr.to_string()
}
_ => {
return Err("Unsupported operation".into());
}
}
})
.await?;

let mut stream = TcpStream::connect(self.addr)
.await
.map_err(|e| Error::with_io(e))?;

stream.send(Action::Connect(self.conv).into()).await?;
Ok(stream)

Ok((stream, to))
}
}

Expand Down
3 changes: 2 additions & 1 deletion fuso-core/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub const CMD_PING: u8 = 0x10;
pub const CMD_CREATE: u8 = 0x30;
pub const CMD_CONNECT: u8 = 0x40;
pub const CMD_FORWARD: u8 = 0x41;
pub const CMD_ERROR: u8 = 0x50;
pub const CMD_ERROR: u8 = 0x50;
pub const CMD_NOTHING: u8 = 0x66;
31 changes: 21 additions & 10 deletions fuso-core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, SafeStreamEx, Spwan};
use crate::retain::Heartbeat;
use crate::{dispatch::DynHandler, packet::Action};
use crate::{
dispatch::{StrategyEx, SafeTcpStream},
dispatch::{SafeTcpStream, StrategyEx},
retain::HeartGuard,
};

Expand All @@ -44,6 +44,7 @@ pub struct FusoStream<IO> {

pub struct Channel {
pub conv: u64,
pub name: String,
pub core: HeartGuard<fuso_api::SafeStream<TcpStream>>,
pub config: Arc<Config>,
pub strategys: Arc<Vec<Arc<Box<DynHandler<Arc<Channel>, Action>>>>>,
Expand Down Expand Up @@ -272,23 +273,24 @@ impl Context {
let (conv, accept_ax) = self.fork().await;
let accept_tx = self.accept_ax.clone();
let clinet_addr = tcp.local_addr().unwrap();
let bind_addr = addr.unwrap_or("0.0.0.0:0".parse().unwrap());
let bind_addr = addr.unwrap_or(([0, 0, 0, 0], 0).into());
let listen = bind_addr.tcp_listen().await?;
let strategys = self.strategys.clone();

let mut core = tcp.guard(5000).await?;
let _ = core.send(Action::Accept(conv).into()).await?;

let strategys = self.strategys.clone();
let name = name.unwrap_or("anonymous".to_string());

let channel = Arc::new(Channel {
conv,
core,
strategys,
name: name.clone(),
config: self.config.clone(),
wait_queue: Arc::new(Mutex::new(VecDeque::new())),
});

let name = name.unwrap_or("anonymous".to_string());

log::info!(
"New mapping [{}] {} -> {}",
name,
Expand Down Expand Up @@ -389,11 +391,19 @@ impl Context {
} else {
let action = action.unwrap();
log::debug!("action {:?}", action);
// 通知客户端需执行的方法
let _ = core.send(action.into()).await;
// 暂时休眠当前这个连接, 该连接可能会超时,
// 并且连接数达到一定数量时可能导致连接积累过多导致无法在建立连接,也就是fd用尽
let _ = channel.suspend(tcp).await;

match action {
Action::Nothing => {
let _ = tcp.close().await;
}
_ => {
// 通知客户端需执行的方法
let _ = core.send(action.into()).await;
// 暂时休眠当前这个连接, 该连接可能会超时,
// 并且连接数达到一定数量时可能导致连接积累过多导致无法在建立连接,也就是fd用尽
let _ = channel.suspend(tcp).await;
}
}
}
}
.detach();
Expand All @@ -412,6 +422,7 @@ impl Context {
}
}


impl<T> FusoStream<T> {
#[inline]
pub fn new(from: T, to: T) -> Self {
Expand Down
7 changes: 4 additions & 3 deletions fuso-core/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::packet::Action;
#[derive(Debug, Clone, Copy)]
pub enum State<T> {
Next,
Release,
Accept(T),
}

Expand Down Expand Up @@ -50,9 +51,8 @@ where
let handle = handle.clone();

match handle.dispose(io.clone(), cx.clone()).await? {
State::Accept(()) => {
return Ok(());
}
State::Accept(()) => return Ok(()),
State::Release => return Ok(()),
State::Next => {
log::debug!("[dispatch] Next handler, rollback");
}
Expand Down Expand Up @@ -94,6 +94,7 @@ where
State::Accept(action) => {
return Ok(action);
}
State::Release => return Ok(Action::Nothing),
State::Next => {
// io.back().await?;
log::debug!("[dispatch] Next handler, rollback");
Expand Down
1 change: 1 addition & 0 deletions fuso-core/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ where
for chain in chains.iter() {
match chain(o.clone(), c.clone()).await? {
State::Accept(a) => return Ok(State::Accept(a)),
State::Release => return Ok(State::Release),
State::Next => {}
}
}
Expand Down
4 changes: 3 additions & 1 deletion fuso-core/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub enum Action {
Forward(Addr),
Connect(u64),
Err(String),
Nothing,
}

impl Addr {
Expand Down Expand Up @@ -93,7 +94,7 @@ impl TryFrom<&[u8]> for Addr {
if buf.len() <= size {
return Err(fuso_api::ErrorKind::BadPacket.into());
} else {
let domain = String::from_utf8_lossy(&buf[1..size]).into();
let domain = String::from_utf8_lossy(&buf[..size]).into();
buf.advance(size);
domain
}
Expand Down Expand Up @@ -205,6 +206,7 @@ impl From<Action> for fuso_api::Packet {
}),
Action::Forward(addr) => Packet::new(CMD_FORWARD, addr.to_bytes()),
Action::Err(e) => Packet::new(CMD_ERROR, e.into()),
Action::Nothing => Packet::new(CMD_RESET, Bytes::new()),
}
}
}
55 changes: 55 additions & 0 deletions src/assets/client-cfg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: app
version: '1.0'
author: --
about: https://github.com/editso/fuso
args:
- server-host:
default_value: '127.0.0.1'
about: 服务端监听的地址
- server-port:
default_value: '9003'
about: 服务端监听的端口
- forward-host:
short: h
long: host
default_value: '127.0.0.1'
takes_value: true
about: 转发地址, (如果开启了socks代理该参数将无效)
- forward-port:
short: p
long: port
default_value: '80'
takes_value: true
about: 转发的端口 (如果开启了socks代理该参数将无效)
- forward-type:
short: t
long: type
takes_value: true
possible_values:
- socks
- forward
default_value: forward
about: 转发类型
- service-bind-port:
short: b
long: bind
takes_value: true
about: 真实映射成功后访问的端口号, 不指定将自动分配
- xor-secret:
short: x
long: xor
default_value: '27'
about: 传输时使用异或加密的Key
- bridge-bind-host:
long: bridge-host
takes_value: true
about: 桥接服务监听的地址
- bridge-bind-port:
long: bridge-port
takes_value: true
about: 桥接服务监听的端口
- name:
short: n
long: name
takes_value: true
about: 自定义当前映射服务的名称
Loading

0 comments on commit f61ab8b

Please sign in to comment.