forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp.rs
119 lines (95 loc) · 3.58 KB
/
http.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use criterion::{criterion_group, Benchmark, Criterion, Throughput};
use futures::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Response, Server};
use std::net::SocketAddr;
use vector::test_util::{next_addr, random_lines, send_lines, wait_for_tcp};
use vector::{
runtime, sinks, sources,
topology::{self, config},
};
fn benchmark_http_no_compression(c: &mut Criterion) {
let num_lines: usize = 100_000;
let line_size: usize = 100;
let in_addr = next_addr();
let out_addr = next_addr();
serve(out_addr);
let bench = Benchmark::new("http_no_compression", move |b| {
b.iter_with_setup(
|| {
let mut config = config::Config::empty();
config.add_source("in", sources::tcp::TcpConfig::new(in_addr.into()));
config.add_sink(
"out",
&["in"],
sinks::http::HttpSinkConfig {
uri: out_addr.to_string(),
compression: Some(sinks::util::Compression::None),
..Default::default()
},
);
let mut rt = runtime::Runtime::new().unwrap();
let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
wait_for_tcp(in_addr);
(rt, topology)
},
|(mut rt, topology)| {
let send = send_lines(in_addr, random_lines(line_size).take(num_lines));
rt.block_on(send).unwrap();
rt.block_on(topology.stop()).unwrap();
rt.shutdown_now().wait().unwrap();
},
)
})
.sample_size(10)
.noise_threshold(0.05)
.throughput(Throughput::Bytes((num_lines * line_size) as u32));
c.bench("http", bench);
}
fn benchmark_http_gzip(c: &mut Criterion) {
let num_lines: usize = 100_000;
let line_size: usize = 100;
let in_addr = next_addr();
let out_addr = next_addr();
serve(out_addr);
let bench = Benchmark::new("http_gzip", move |b| {
b.iter_with_setup(
|| {
let mut config = config::Config::empty();
config.add_source("in", sources::tcp::TcpConfig::new(in_addr.into()));
config.add_sink(
"out",
&["in"],
sinks::http::HttpSinkConfig {
uri: out_addr.to_string(),
..Default::default()
},
);
let mut rt = runtime::Runtime::new().unwrap();
let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
wait_for_tcp(in_addr);
(rt, topology)
},
|(mut rt, topology)| {
let send = send_lines(in_addr, random_lines(line_size).take(num_lines));
rt.block_on(send).unwrap();
rt.block_on(topology.stop()).unwrap();
rt.shutdown_now().wait().unwrap();
},
)
})
.sample_size(10)
.noise_threshold(0.05)
.throughput(Throughput::Bytes((num_lines * line_size) as u32));
c.bench("http", bench);
}
fn serve(addr: SocketAddr) {
std::thread::spawn(move || {
let make_service = || service_fn_ok(|_req| Response::new(Body::empty()));
let fut = Server::bind(&addr)
.serve(make_service)
.map_err(|e| panic!(e));
tokio::runtime::current_thread::run(fut);
});
}
criterion_group!(http, benchmark_http_no_compression, benchmark_http_gzip);