forked from lnx-search/rewrk
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
bb284cb
commit aad593f
Showing
6 changed files
with
390 additions
and
231 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,162 +1,201 @@ | ||
use crate::error::AnyError; | ||
use crate::proto::tcp_stream; | ||
use crate::proto::tls; | ||
use crate::proto::tcp_stream::CustomTcpStream; | ||
use crate::results::WorkerResult; | ||
use crate::utils::{get_http1_request, Scheme}; | ||
|
||
use std::str::FromStr; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
use tokio::task::JoinHandle; | ||
use tokio::net::TcpStream; | ||
use tokio::sync::mpsc; | ||
use tokio::time::Duration; | ||
|
||
use tokio_rustls::TlsConnector; | ||
use tokio_rustls::webpki::{DNSName, DNSNameRef}; | ||
|
||
use hyper::client::conn; | ||
use hyper::{Body, StatusCode, Uri}; | ||
|
||
use tower::{Service, ServiceExt}; | ||
|
||
use crate::results::WorkerResult; | ||
use crate::utils::get_request; | ||
|
||
/// A single http/1 connection worker | ||
/// | ||
/// Builds a new http client with the http2_only option set either to false. | ||
/// | ||
/// It then waits for the signaller to start sending pings to queue requests, | ||
/// a client can take a request from the queue and then send the request, | ||
/// these times are then measured and compared against previous latencies | ||
/// to work out the min, max, total time and total requests of the given | ||
/// worker which can then be sent back to the controller when the handle | ||
/// is awaited. | ||
pub async fn client( | ||
struct BenchmarkClient { | ||
tls_connector: TlsConnector, | ||
time_for: Duration, | ||
uri_string: String, | ||
predicted_size: usize, | ||
) -> Result<WorkerResult, AnyError> { | ||
let uri = Uri::from_str(&uri_string)?; | ||
uri: Uri, | ||
scheme: Scheme, | ||
host: String, | ||
host_dns: DNSName, | ||
port: u16, | ||
counter: Arc<AtomicUsize> | ||
} | ||
|
||
impl BenchmarkClient { | ||
fn new( | ||
time_for: Duration, | ||
uri_string: String, | ||
predicted_size: usize | ||
) -> Result<Self, AnyError> { | ||
let tls_connector = tls::http1_alpn_connector()?; | ||
|
||
let host = uri.host().ok_or("cant find host")?; | ||
let port = uri.port_u16().unwrap_or(80); | ||
let uri = Uri::from_str(&uri_string)?; | ||
|
||
let host_port = format!("{}:{}", host, port); | ||
let scheme = Scheme::from(uri.scheme_str()); | ||
|
||
let counter = Arc::new(AtomicUsize::new(0)); | ||
let host = uri.host().ok_or("cant find host")?.to_owned(); | ||
let host_dns = DNSNameRef::try_from_ascii_str(&host)?.to_owned(); | ||
|
||
let (disconnect_tx, mut disconnect_rx) = mpsc::channel(1); | ||
let port = match uri.port_u16() { | ||
Some(port) => port, | ||
None => scheme.default_port() | ||
}; | ||
|
||
let counter = Arc::new(AtomicUsize::new(0)); | ||
|
||
Ok(Self { | ||
tls_connector, | ||
time_for, | ||
predicted_size, | ||
uri, | ||
scheme, | ||
host, | ||
host_dns, | ||
port, | ||
counter | ||
}) | ||
} | ||
|
||
let start = Instant::now(); | ||
async fn run(&self) -> Result<WorkerResult, AnyError> { | ||
let start = Instant::now(); | ||
|
||
let mut session = connect_with_retry( | ||
start, | ||
time_for, | ||
&host_port, | ||
counter.clone(), | ||
disconnect_tx.clone(), | ||
) | ||
.await?; | ||
let (mut send_request, mut conn_handle) = self.connect_retry(start, self.time_for).await?; | ||
|
||
let mut times: Vec<Duration> = Vec::with_capacity(predicted_size); | ||
let mut times: Vec<Duration> = Vec::with_capacity(self.predicted_size); | ||
|
||
while time_for > start.elapsed() { | ||
tokio::select! { | ||
val = send_request(&uri, &mut session, &mut times) => { | ||
if let Err(_e) = val { | ||
// Errors are ignored currently. | ||
while self.time_for > start.elapsed() { | ||
tokio::select! { | ||
val = self.bench_request(&mut send_request, &mut times) => { | ||
if let Err(_e) = val { | ||
// Errors are ignored currently. | ||
} | ||
}, | ||
_ = (&mut conn_handle) => { | ||
let (sr, handle) = self.connect_retry(start, self.time_for).await?; | ||
|
||
send_request = sr; | ||
conn_handle = handle; | ||
} | ||
}, | ||
_ = disconnect_rx.recv() => { | ||
session = connect_with_retry( | ||
start, | ||
time_for, | ||
&host_port, | ||
counter.clone(), | ||
disconnect_tx.clone(), | ||
).await?; | ||
}, | ||
}; | ||
} | ||
|
||
let time_taken = start.elapsed(); | ||
|
||
let result = WorkerResult { | ||
total_times: vec![time_taken], | ||
request_times: times, | ||
buffer_sizes: vec![self.counter.load(Ordering::Acquire)] | ||
}; | ||
|
||
Ok(result) | ||
} | ||
|
||
let time_taken = start.elapsed(); | ||
// NOTE: Currently ignoring errors. | ||
async fn bench_request( | ||
&self, | ||
send_request: &mut conn::SendRequest<Body>, | ||
times: &mut Vec<Duration> | ||
) -> Result<(), AnyError> { | ||
let req = get_http1_request(&self.uri); | ||
|
||
let result = WorkerResult { | ||
total_times: vec![time_taken], | ||
request_times: times, | ||
buffer_sizes: vec![counter.load(Ordering::Acquire)], | ||
}; | ||
let ts = Instant::now(); | ||
|
||
Ok(result) | ||
} | ||
if let Err(_) = send_request.ready().await { | ||
return Ok(()); | ||
} | ||
|
||
// NOTE: Currently ignoring errors. | ||
async fn send_request( | ||
uri: &Uri, | ||
session: &mut conn::SendRequest<Body>, | ||
times: &mut Vec<Duration>, | ||
) -> Result<(), AnyError> { | ||
let req = get_request(&uri); | ||
let resp = match send_request.call(req).await { | ||
Ok(v) => v, | ||
Err(_) => return Ok(()) | ||
}; | ||
|
||
let took = ts.elapsed(); | ||
|
||
let status = resp.status(); | ||
assert_eq!(status, StatusCode::OK); | ||
|
||
let ts = Instant::now(); | ||
let _buff = match hyper::body::to_bytes(resp).await { | ||
Ok(v) => v, | ||
Err(_) => return Ok(()) | ||
}; | ||
|
||
times.push(took); | ||
|
||
match session.ready().await { | ||
Ok(_) => (), | ||
Err(_) => return Ok(()), | ||
Ok(()) | ||
} | ||
|
||
let resp = match session.call(req).await { | ||
Ok(v) => v, | ||
Err(_) => return Ok(()), | ||
}; | ||
async fn connect_retry( | ||
&self, | ||
start: Instant, | ||
time_for: Duration | ||
) -> Result<(conn::SendRequest<Body>, JoinHandle<()>), AnyError> { | ||
while start.elapsed() < time_for { | ||
let res = self.connect().await; | ||
|
||
match res { | ||
Ok(val) => return Ok(val), | ||
Err(_) => () | ||
} | ||
} | ||
|
||
let took = ts.elapsed(); | ||
Err("connection closed".into()) | ||
} | ||
|
||
let status = resp.status(); | ||
assert_eq!(status, StatusCode::OK); | ||
async fn connect(&self) -> Result<(conn::SendRequest<Body>, JoinHandle<()>), AnyError> { | ||
let host_port = format!("{}:{}", self.host, self.port); | ||
|
||
let _buff = match hyper::body::to_bytes(resp).await { | ||
Ok(v) => v, | ||
Err(_) => return Ok(()), | ||
}; | ||
let stream = TcpStream::connect(&host_port).await?; | ||
let stream = CustomTcpStream::new(stream, self.counter.clone()); | ||
|
||
times.push(took); | ||
match self.scheme { | ||
Scheme::HTTP => { | ||
let (send_request, connection) = conn::handshake(stream).await?; | ||
let handle = tokio::spawn(async move { | ||
if let Err(_) = connection.await {} | ||
|
||
Ok(()) | ||
} | ||
// Connection died | ||
// Should reconnect and log | ||
}); | ||
|
||
async fn connect_with_retry( | ||
start: Instant, | ||
time_for: Duration, | ||
host_port: &str, | ||
counter: Arc<AtomicUsize>, | ||
disconnect_tx: mpsc::Sender<()>, | ||
) -> Result<conn::SendRequest<Body>, AnyError> { | ||
while start.elapsed() < time_for { | ||
let res = connect(host_port, counter.clone(), disconnect_tx.clone()).await; | ||
|
||
match res { | ||
Ok(session) => return Ok(session), | ||
Err(_) => (), | ||
Ok((send_request, handle)) | ||
}, | ||
Scheme::HTTPS => { | ||
let stream = self.tls_connector.connect(self.host_dns.as_ref(), stream).await?; | ||
|
||
let (send_request, connection) = conn::handshake(stream).await?; | ||
let handle = tokio::spawn(async move { | ||
if let Err(_) = connection.await {} | ||
|
||
// Connection died | ||
// Should reconnect and log | ||
}); | ||
|
||
Ok((send_request, handle)) | ||
} | ||
} | ||
} | ||
|
||
Err("connection closed".into()) | ||
} | ||
|
||
async fn connect( | ||
host_port: &str, | ||
counter: Arc<AtomicUsize>, | ||
disconnect_tx: mpsc::Sender<()>, | ||
) -> Result<conn::SendRequest<Body>, AnyError> { | ||
let stream = | ||
tcp_stream::CustomTcpStream::new(TcpStream::connect(&host_port).await?, counter.clone()); | ||
|
||
let (session, connection) = conn::handshake(stream).await?; | ||
tokio::spawn(async move { | ||
if let Err(_) = connection.await {} | ||
pub async fn client( | ||
time_for: Duration, | ||
uri_string: String, | ||
predicted_size: usize, | ||
) -> Result<WorkerResult, AnyError> { | ||
let benchmark_client = BenchmarkClient::new(time_for, uri_string, predicted_size)?; | ||
|
||
// Connection died | ||
// Should reconnect and log | ||
if let Err(_) = disconnect_tx.send(()).await {} | ||
}); | ||
let result = benchmark_client.run().await?; | ||
|
||
Ok(session) | ||
Ok(result) | ||
} |
Oops, something went wrong.