Skip to content

Commit

Permalink
app/outbound/manager: rewrite outbounds loading logic
Browse files Browse the repository at this point in the history
Fixes #134
  • Loading branch information
eycorsican committed Apr 16, 2021
1 parent a33fb8a commit cce8b6e
Showing 1 changed file with 103 additions and 44 deletions.
147 changes: 103 additions & 44 deletions leaf/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,17 @@ impl OutboundManager {
fn load_handlers(
outbounds: &protobuf::RepeatedField<Outbound>,
dns_client: Arc<RwLock<DnsClient>>,
) -> Result<(
HashMap<String, Arc<dyn OutboundHandler>>,
Option<String>,
Vec<AbortHandle>,
)> {
let mut handlers: HashMap<String, Arc<dyn OutboundHandler>> = HashMap::new();
let mut default_handler: Option<String> = None;
let mut abort_handles = Vec::new();

handlers: &mut HashMap<String, Arc<dyn OutboundHandler>>,
default_handler: &mut Option<String>,
abort_handles: &mut Vec<AbortHandle>,
) -> Result<()> {
for outbound in outbounds.iter() {
let tag = String::from(&outbound.tag);
if handlers.contains_key(&tag) {
continue;
}
if default_handler.is_none() {
default_handler = Some(String::from(&outbound.tag));
default_handler.replace(String::from(&outbound.tag));
debug!("default handler [{}]", &outbound.tag);
}
let bind_addr = SocketAddr::new(outbound.bind.parse::<IpAddr>()?, 0);
Expand All @@ -95,6 +93,7 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-drop")]
Expand All @@ -108,6 +107,7 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-redirect")]
Expand All @@ -134,6 +134,7 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-socks")]
Expand Down Expand Up @@ -164,6 +165,7 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-shadowsocks")]
Expand Down Expand Up @@ -194,7 +196,8 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
handlers.insert(tag, handler);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-trojan")]
"trojan" => {
Expand Down Expand Up @@ -222,7 +225,8 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
handlers.insert(tag, handler);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-vmess")]
"vmess" => {
Expand Down Expand Up @@ -252,7 +256,8 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
handlers.insert(tag, handler);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-tls")]
"tls" => {
Expand All @@ -278,6 +283,7 @@ impl OutboundManager {
Some(tcp),
None,
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-ws")]
Expand All @@ -301,6 +307,7 @@ impl OutboundManager {
Some(tcp),
None,
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-quic")]
Expand Down Expand Up @@ -337,6 +344,7 @@ impl OutboundManager {
Some(tcp),
None,
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-h2")]
Expand All @@ -359,19 +367,18 @@ impl OutboundManager {
Some(tcp),
None,
);
trace!("add handler [{}]", &tag);
handlers.insert(tag.clone(), handler);
}
_ => (),
_ => continue,
}
}

let mut parsed = Vec::new();

// FIXME a better way to find outbound deps?
for _i in 0..4 {
for _i in 0..8 {
'outbounds: for outbound in outbounds.iter() {
let tag = String::from(&outbound.tag);
if parsed.contains(&tag) {
if handlers.contains_key(&tag) {
continue;
}
let bind_addr = SocketAddr::new(outbound.bind.parse::<IpAddr>()?, 0);
Expand Down Expand Up @@ -413,6 +420,11 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-random")]
Expand Down Expand Up @@ -448,6 +460,11 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-failover")]
Expand Down Expand Up @@ -496,6 +513,11 @@ impl OutboundManager {
Some(Box::new(tcp)),
Some(Box::new(udp)),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
abort_handles.append(&mut tcp_abort_handles);
abort_handles.append(&mut udp_abort_handles);
Expand Down Expand Up @@ -535,6 +557,11 @@ impl OutboundManager {
Some(Box::new(tcp)),
None,
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
abort_handles.append(&mut tcp_abort_handles);
}
Expand Down Expand Up @@ -571,6 +598,11 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
}
#[cfg(feature = "outbound-retry")]
Expand Down Expand Up @@ -610,30 +642,31 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
}
_ => (),
_ => continue,
}
parsed.push(tag.clone());
}
}

Ok((handlers, default_handler, abort_handles))
Ok(())
}

fn load_selectors(
outbounds: &protobuf::RepeatedField<Outbound>,
handlers: &mut HashMap<String, Arc<dyn OutboundHandler>>,
) -> Result<super::Selectors> {
let mut selectors: super::Selectors = HashMap::new();

let mut parsed = Vec::new();

selectors: &mut super::Selectors,
) -> Result<()> {
// FIXME a better way to find outbound deps?
for _i in 0..4 {
for _i in 0..8 {
'outbounds: for outbound in outbounds.iter() {
let tag = String::from(&outbound.tag);
if parsed.contains(&tag) {
if handlers.contains_key(&tag) || selectors.contains_key(&tag) {
continue;
}
#[allow(clippy::single_match)]
Expand Down Expand Up @@ -684,15 +717,19 @@ impl OutboundManager {
Some(tcp),
Some(udp),
);
trace!(
"add handler [{}] with actors: {}",
&tag,
settings.actors.join(",")
);
handlers.insert(tag.clone(), handler);
}
_ => (),
_ => continue,
}
parsed.push(tag.clone());
}
}

Ok(selectors)
Ok(())
}

// TODO make this non-async?
Expand All @@ -701,23 +738,29 @@ impl OutboundManager {
outbounds: &protobuf::RepeatedField<Outbound>,
dns_client: Arc<RwLock<DnsClient>>,
) -> Result<()> {
// Save selected outounds.
// Save outound select states.
let mut selected_outbounds = HashMap::new();
for (k, v) in self.selectors.iter() {
selected_outbounds.insert(k.to_owned(), v.read().await.get_selected_tag());
}

let (mut handlers, default_handler, abort_handles) =
Self::load_handlers(outbounds, dns_client)?;

// Abort spawned tasks inside handlers.
for abort_handle in self.abort_handles.iter() {
abort_handle.abort();
// Load new outbounds.
let mut handlers: HashMap<String, Arc<dyn OutboundHandler>> = HashMap::new();
let mut default_handler: Option<String> = None;
let mut abort_handles: Vec<AbortHandle> = Vec::new();
let mut selectors: super::Selectors = HashMap::new();
for _i in 0..4 {
Self::load_handlers(
outbounds,
dns_client.clone(),
&mut handlers,
&mut default_handler,
&mut abort_handles,
)?;
Self::load_selectors(outbounds, &mut handlers, &mut selectors)?;
}

let mut selectors = Self::load_selectors(outbounds, &mut handlers)?;

// Restore selected outbounds.
// Restore outbound select states.
for (k, v) in selected_outbounds.iter() {
for (k2, v2) in selectors.iter_mut() {
if k == k2 {
Expand All @@ -728,6 +771,11 @@ impl OutboundManager {
}
}

// Abort spawned tasks inside handlers.
for abort_handle in self.abort_handles.iter() {
abort_handle.abort();
}

self.handlers = handlers;
self.selectors = Arc::new(selectors);
self.default_handler = default_handler;
Expand All @@ -739,9 +787,20 @@ impl OutboundManager {
outbounds: &protobuf::RepeatedField<Outbound>,
dns_client: Arc<RwLock<DnsClient>>,
) -> Result<Self> {
let (mut handlers, default_handler, abort_handles) =
Self::load_handlers(outbounds, dns_client)?;
let selectors = Self::load_selectors(outbounds, &mut handlers)?;
let mut handlers: HashMap<String, Arc<dyn OutboundHandler>> = HashMap::new();
let mut default_handler: Option<String> = None;
let mut abort_handles: Vec<AbortHandle> = Vec::new();
let mut selectors: super::Selectors = HashMap::new();
for _i in 0..4 {
Self::load_handlers(
outbounds,
dns_client.clone(),
&mut handlers,
&mut default_handler,
&mut abort_handles,
)?;
Self::load_selectors(outbounds, &mut handlers, &mut selectors)?;
}
Ok(OutboundManager {
handlers,
selectors: Arc::new(selectors),
Expand Down

0 comments on commit cce8b6e

Please sign in to comment.