Skip to content

Commit

Permalink
#14 refactor: tidy & handle more exceptions from spvn-rs/0.1.1
Browse files Browse the repository at this point in the history
refactor: tidy & handle more exceptions
  • Loading branch information
joshua-auchincloss authored Jun 7, 2023
2 parents d5d7def + 677a177 commit b1ddaad
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 162 deletions.
160 changes: 76 additions & 84 deletions crates/spvn/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use colored::Colorize;
use http::response::Builder;
use pyo3::prelude::*;
use spvn_serde::{body_receiver::PyAsyncBodyReceiver, coalesced, state::StateMap, ASGIResponse};
use tracing::debug;
use tokio::task::JoinError;
use tracing::{debug, error};

use crate::handlers::tasks::Scheduler;

Expand All @@ -29,30 +30,29 @@ use tokio_util::sync::CancellationToken;
use tower_service::Service;
type Ra = Result<http::Response<http_body::Full<bytes::Bytes>>, hyper::Error>;

// 88b / request - TODO: ref more
pub struct Bridge {
// state: State,
caller: Arc<Caller>,
// ptr only
scheduler: Arc<Scheduler>,
cancel: Box<CancellationToken>,
// scheduler: Arc<Scheduler>,
peer: SocketAddr,
server: SocketAddr,
// token: CancellationToken,
}

impl Bridge {
pub fn new(
caller: Arc<Caller>,
scheduler: Arc<Scheduler>,
_scheduler: Arc<Scheduler>,
peer: SocketAddr,
server: SocketAddr,
// token: CancellationToken,
) -> Self {
let token = CancellationToken::new();
Self {
caller: caller,
scheduler: scheduler.clone(),
cancel: Box::new(token),
// scheduler: scheduler.clone(),
peer,
server,
// token,
}
}
}
Expand All @@ -64,6 +64,7 @@ fn bail() -> Ra {
.unwrap())
}

/// Servicer errors
fn bail_err(err: hyper::Error) -> Ra {
eprintln!(
"{} an error occurred in the servicer - {:#?}",
Expand All @@ -73,22 +74,16 @@ fn bail_err(err: hyper::Error) -> Ra {
bail()
}

fn bail_py(err: PyErr) -> Ra {
eprintln!(
"{} an error occurred in the caller - {:#?}",
"error".red(),
err
);
/// Generic Lib Errs (internal)
fn bail_anyhow(err: anyhow::Error) -> Ra {
eprintln!("{} an error occurred - {:#?}", "error".red(), err);
bail()
}

struct SendResponse<'a>(&'a std::sync::Mutex<Option<(Builder, Bytes)>>);

impl<'a> SendResponse<'a> {
fn replace(&mut self, other: Option<(Builder, Bytes)>) {
let mut state = self.0.lock().unwrap();
*state = other;
}
/// Tokio runtime errors
fn bail_join(err: JoinError) -> Ra {
eprintln!("{} an error occurred - {:#?}", "error".red(), err);
bail()
}

impl Service<Request<IncomingBody>> for Bridge {
Expand All @@ -108,92 +103,86 @@ impl Service<Request<IncomingBody>> for Bridge {
state: State,
server: SocketAddr,
peer: SocketAddr,
// token: CancellationToken,
) -> Ra {
// let final =
// received body channel
let (tx_bdy, rx_bdy) = crossbeam::channel::bounded::<ASGIResponse>(4);

// outbound reponse serialization op -> see `coalesced::coalesce_from_state`
let (tx_builder, rx_builder) = crossbeam::channel::bounded::<(Builder, Bytes)>(1);
let _resp_mu: Arc<std::sync::Mutex<Option<(Builder, Bytes)>>> =
Arc::new(std::sync::Mutex::new(None));
// let mut sr = SendResponse(&*resp_mu);

// spawn this on a new task so we allow the callback channel to open before py-caller
tokio::spawn(async move {
while let Ok(resp) = rx_bdy.recv() {
let mut state = state.lock().await;
state.0.insert(resp);
}
let state = state.lock().await;
let response = coalesced::coslesce_from_state(&state, Response::builder(), true);
// sr.replace(Some(response));

// captured = Some("".to_string());
let _res = tx_builder.send(response);

// match res {
// Ok(_r) => (),
// Err(_e) => panic!("couldnt send response to channel"),
// }
{
let state = state.lock().await;
let response =
coalesced::coslesce_from_state(&state, Response::builder(), true);
let res: Result<(), crossbeam::channel::SendError<(Builder, Bytes)>> =
tx_builder.send(response);
match res {
Ok(_r) => (),
Err(_e) => {
error!(
"{} couldnt send response to channel, cancelling due to full bail",
"error".red()
);
// token.cancel();
}
}
}
});
// this allows functionality of `await receive()`
let sender = Sender::new(tx_bdy);

let _bail_super = || return bail();

// todo: handle
let _token = CancellationToken::new();
// get this before the body starts reading
let asgi = asgi_from_request(&req, server, peer);

// takes ownership of the request
// currently a blocking op - TODO: synchronize body receivership
let body = body::to_bytes(req.into_body()).await;
let _b = match body {
let val = match body {
Ok(bts) => bts,
Err(err) => return bail_err(err),
};
let receiver = PyAsyncBodyReceiver { val: _b };

let join_caller: Result<Result<(), ()>, tokio::task::JoinError> =
tokio::task::spawn(async move {
let res = Python::with_gil(|py| {
let obj = asgi.to_object(py);
caller.call(py, (obj, receiver, sender))
});
let receiver = PyAsyncBodyReceiver::from(val);
let join_caller = tokio::task::spawn(async move {
let res = Python::with_gil(|py| caller.call(py, (asgi, receiver, sender)));
// do not remove cfg stmt
#[cfg(debug_assertions)]
{
debug!("{:#?}", res);
Ok(())
})
.await;
}
res
})
.await;

match join_caller {
Ok(call) => match call {
Ok(_) => (),
Err(_pye) => {
// eprintln!("{:#?}", pye);
return bail();
Ok(res) => {
match res {
// we dont care about the python response - nothing to receive
Ok(_) => {
match rx_builder.recv() {
// we have a full response
Ok((builder, bts)) => {
return Ok(builder.body(Full::new(bts)).unwrap())
}
// an error receiving
Err(e) => {
error!("{} occured receiving response builder - {}", "error".red(), e);
return bail();
}
}
}
// internal server error, get outta here
Err(err) => return bail_anyhow(err),
}
},
Err(pye) => {
eprintln!("{}", pye)
}
Err(err) => bail_join(err),
}

match rx_builder.recv() {
Ok((builder, bts)) => return Ok(builder.body(Full::new(bts)).unwrap()),
Err(_) => return bail(),
}
// match (*resp_mu).lock().unwrap().take() {
// Some((builder, body)) => {
// let resp = builder.body(Full::new(body)).unwrap();

// // #[cfg(debug_assertions)]
// // {
// // println!("{:#?}", resp)
// // }
// return Ok(resp);
// }
// None => {
// #[cfg(debug_assertions)]
// {
// eprintln!("the receiver failed")
// }
// return bail();
// }
// }
}

let hm: StateMap = StateMap::default();
Expand All @@ -204,6 +193,9 @@ impl Service<Request<IncomingBody>> for Bridge {
state,
self.server,
self.peer,

// TODO: find something to use instead
// self.token.child_token(),
))
}
}
42 changes: 35 additions & 7 deletions crates/spvn/src/spvn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::handlers::{
};

use hyper::server::conn::Http;
use tokio_util::sync::CancellationToken;
use tracing::debug;

use spvn_caller::{PySpawn, service::caller::Caller};
use spvn_caller::{service::caller::Caller, PySpawn};

use futures::executor;
use pyo3::Python;
Expand Down Expand Up @@ -82,14 +83,21 @@ async fn loop_tls(
scheduler: Arc<Scheduler>,
server: SocketAddr,
quiet: bool,
// token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
let (stream, peer) = listener.accept().await?;
let acceptor = acceptor.clone();
let bi = bi.clone();
let scheduler = scheduler.clone();

let service = Bridge::new(bi.clone(), scheduler.clone(), peer, server);
let service = Bridge::new(
bi.clone(),
scheduler.clone(),
peer,
server,
// token.child_token(),
);
if !quiet {
let svc = LogService {
target: "bridge",
Expand Down Expand Up @@ -123,13 +131,20 @@ async fn loop_passthru(
scheduler: Arc<Scheduler>,
server: SocketAddr,
quiet: bool,
// token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
let (stream, peer) = listener.accept().await?;
let bi = bi.clone();
let scheduler = scheduler.clone();

let service = Bridge::new(bi.clone(), scheduler.clone(), peer, server);
let service = Bridge::new(
bi.clone(),
scheduler.clone(),
peer,
server,
// token.child_token(),
);
if !quiet {
let svc = LogService {
target: "bridge",
Expand Down Expand Up @@ -161,22 +176,25 @@ impl Spvn {
&mut self,
pid: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// let token = CancellationToken::new();

let addr = self.cfg.bind.bind;
let listener = crate::startup::listen::spawn_so_reuse(addr).await;
let reffed = PySpawn::gen();

let bi: Arc<Caller> = Arc::new(reffed);

#[cfg(feature = "lifespan")]
{
if self.cfg.lifespan {
let ref_ = bi.clone();
tokio::spawn(async move {
ref_.wait_startup();
let _ = ref_.wait_startup();
});
}
}

if !self.cfg.tls.is_none() {
if self.cfg.tls.is_some() {
// branch so we add the tls acceptor
crate::startup::message::startup_message(pid, addr, true);
let acceptor = TlsAcceptor::from(self.cfg.tls.as_ref().unwrap().clone());
loop_tls(
Expand All @@ -186,11 +204,21 @@ impl Spvn {
self.scheduler.clone(),
addr,
self.cfg.quiet,
// token,
)
.await
} else {
// no tls
crate::startup::message::startup_message(pid, addr, false);
loop_passthru(listener, bi, self.scheduler.clone(), addr, self.cfg.quiet).await
loop_passthru(
listener,
bi,
self.scheduler.clone(),
addr,
self.cfg.quiet,
// token,
)
.await
}
}

Expand Down
Loading

0 comments on commit b1ddaad

Please sign in to comment.