Skip to content

Commit a81e499

Browse files
authored
Merge pull request cpp-netlib#725 from carun/example-async-file-uploader
Added example to store large streaming uploads directly to filesystem using async server.
2 parents 111438d + 597a107 commit a81e499

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

libs/network/example/CMakeLists.txt

+14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ add_executable(trivial_google trivial_google.cpp)
2020

2121
if (UNIX)
2222
add_executable(fileserver http/fileserver.cpp)
23+
add_executable(async_server_file_upload http/async_server_file_upload.cpp)
24+
add_dependencies(async_server_file_upload cppnetlib-server-parsers)
2325
endif (UNIX)
2426
add_dependencies(http_client cppnetlib-uri cppnetlib-client-connections)
2527
add_dependencies(simple_wget cppnetlib-uri cppnetlib-client-connections)
@@ -136,6 +138,17 @@ if (UNIX)
136138
if (OPENSSL_FOUND)
137139
target_link_libraries(fileserver ${OPENSSL_LIBRARIES})
138140
endif(OPENSSL_FOUND)
141+
142+
target_link_libraries(async_server_file_upload
143+
${Boost_LIBRARIES}
144+
${CMAKE_THREAD_LIBS_INIT}
145+
cppnetlib-server-parsers)
146+
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
147+
target_link_libraries(async_server_file_upload rt)
148+
endif ()
149+
if (OPENSSL_FOUND)
150+
target_link_libraries(async_server_file_upload ${OPENSSL_LIBRARIES})
151+
endif (OPENSSL_FOUND)
139152
endif (UNIX)
140153

141154
set_target_properties(http_client PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example)
@@ -152,4 +165,5 @@ endif (OPENSSL_FOUND)
152165

153166
if (UNIX)
154167
set_target_properties(fileserver PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example)
168+
set_target_properties(async_server_file_upload PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example)
155169
endif (UNIX)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
//
2+
// Copyright 2017 (c) Arun Chandrasekaran <[email protected]>
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
8+
//
9+
// Example for performing streaming file upload operations directly to
10+
// filesystem using async server
11+
//
12+
// If you use wget, do the following at the client side:
13+
//
14+
// wget localhost:9190/upload?filename=Earth.mp4
15+
// --post-file=$HOME/Videos/Earth-From-Space.mp4
16+
//
17+
#include <boost/shared_ptr.hpp>
18+
#include <boost/network/protocol/http/server.hpp>
19+
#include <boost/network/utils/thread_pool.hpp>
20+
#include <boost/asio.hpp>
21+
22+
#include <chrono>
23+
#include <condition_variable>
24+
#include <mutex>
25+
#include <map>
26+
27+
struct connection_handler;
28+
29+
typedef boost::network::http::server<connection_handler> server;
30+
31+
///
32+
/// Custom exception type
33+
///
34+
struct file_uploader_exception : public std::runtime_error {
35+
file_uploader_exception(const std::string err) :
36+
std::runtime_error(err) {
37+
}
38+
};
39+
40+
///
41+
/// Encapsulates request & connection
42+
///
43+
struct file_uploader : std::enable_shared_from_this<file_uploader> {
44+
const server::request& req;
45+
server::connection_ptr conn;
46+
47+
std::mutex mtx;
48+
std::condition_variable condvar;
49+
50+
FILE* fp = NULL;
51+
52+
public:
53+
file_uploader(const server::request& req, const server::connection_ptr& conn)
54+
: req(req)
55+
, conn(conn) {
56+
const std::string dest = destination(req);
57+
58+
if (dest.find("/upload") != std::string::npos) {
59+
auto queries = get_queries(dest);
60+
auto fname = queries.find("filename");
61+
if (fname != queries.end()) {
62+
fp = ::fopen(fname->second.c_str(), "wb");
63+
if (!fp) {
64+
throw file_uploader_exception("Failed to open file to write");
65+
}
66+
} else {
67+
throw file_uploader_exception("'filename' cannot be empty");
68+
}
69+
}
70+
}
71+
72+
~file_uploader() {
73+
if (fp) {
74+
::fflush(fp);
75+
::fclose(fp);
76+
}
77+
}
78+
79+
///
80+
/// Non blocking call to initiate the data transfer
81+
///
82+
void async_recv() {
83+
std::size_t content_length = 0;
84+
auto const& headers = req.headers;
85+
for (auto item : headers) {
86+
if (boost::to_lower_copy(item.name) == "content-length") {
87+
content_length = std::stoll(item.value);
88+
break;
89+
}
90+
}
91+
92+
read_chunk(conn, content_length);
93+
}
94+
95+
///
96+
/// The client shall wait by calling this until the transfer is done by
97+
/// the IO threadpool
98+
///
99+
void wait_for_completion() {
100+
std::unique_lock<std::mutex> _(mtx);
101+
condvar.wait(_);
102+
}
103+
104+
private:
105+
///
106+
/// Parses the string and gets the query as a key-value pair
107+
///
108+
/// @param [in] dest String containing the path and the queries, without the fragment,
109+
/// of the form "/path?key1=value1&key2=value2"
110+
///
111+
std::map<std::string, std::string> get_queries(const std::string dest) {
112+
113+
std::size_t pos = dest.find_first_of("?");
114+
115+
std::map<std::string, std::string> queries;
116+
if (pos != std::string::npos) {
117+
std::string query_string = dest.substr(pos + 1);
118+
119+
// Replace '&' with space
120+
for (pos = 0; pos < query_string.size(); pos++) {
121+
if (query_string[pos] == '&') {
122+
query_string[pos] = ' ';
123+
}
124+
}
125+
126+
std::istringstream sin(query_string);
127+
while (sin >> query_string) {
128+
129+
pos = query_string.find_first_of("=");
130+
131+
if (pos != std::string::npos) {
132+
const std::string key = query_string.substr(0, pos);
133+
const std::string value = query_string.substr(pos + 1);
134+
queries[key] = value;
135+
}
136+
}
137+
}
138+
139+
return queries;
140+
}
141+
142+
///
143+
/// Reads a chunk of data
144+
///
145+
/// @param [in] conn Connection to read from
146+
/// @param [in] left2read Size to read
147+
///
148+
void read_chunk(server::connection_ptr conn, std::size_t left2read) {
149+
conn->read(boost::bind(&file_uploader::on_data_ready,
150+
file_uploader::shared_from_this(),
151+
_1, _2, _3, conn, left2read));
152+
}
153+
154+
///
155+
/// Callback that gets called when the data is ready to be consumed
156+
///
157+
void on_data_ready(server::connection::input_range range,
158+
boost::system::error_code error,
159+
std::size_t size,
160+
server::connection_ptr conn,
161+
std::size_t left2read) {
162+
if (!error) {
163+
::fwrite(boost::begin(range), size, 1, fp);
164+
std::size_t left = left2read - size;
165+
if (left > 0)
166+
read_chunk(conn, left);
167+
else
168+
wakeup();
169+
}
170+
}
171+
172+
///
173+
/// Wakesup the waiting thread
174+
///
175+
void wakeup() {
176+
std::unique_lock<std::mutex> _(mtx);
177+
condvar.notify_one();
178+
}
179+
};
180+
181+
///
182+
/// Functor that gets executed whenever there is a packet on the HTTP port
183+
///
184+
struct connection_handler {
185+
///
186+
/// Gets executed whenever there is a packet on the HTTP port.
187+
///
188+
/// @param [in] req Request object that holds the protobuf data
189+
/// @param [in] conn Connection object
190+
///
191+
void operator()(server::request const& req, const server::connection_ptr& conn) {
192+
static std::map<std::string, std::string> headers = {
193+
{"Connection","close"},
194+
{"Content-Type", "text/plain"}
195+
};
196+
197+
const std::string dest = destination(req);
198+
199+
if (req.method == "POST" && dest.find("/upload") != std::string::npos) {
200+
try {
201+
auto start = std::chrono::high_resolution_clock::now();
202+
// Create a file uploader
203+
std::shared_ptr<file_uploader> uploader(new file_uploader(req, conn));
204+
// On success to create, start receiving the data
205+
uploader->async_recv();
206+
// Wait until the data transfer is done by the IO threads
207+
uploader->wait_for_completion();
208+
209+
// Respond to the client
210+
conn->set_status(server::connection::ok);
211+
conn->set_headers(headers);
212+
auto end = std::chrono::high_resolution_clock::now();
213+
std::chrono::duration<double, std::milli> diff = end - start;
214+
std::ostringstream stm;
215+
stm << "Took " << diff.count() << " milliseconds for the transfer." << std::endl;
216+
conn->write(stm.str());
217+
} catch (const file_uploader_exception& e) {
218+
conn->set_status(server::connection::bad_request);
219+
conn->set_headers(headers);
220+
const std::string err = e.what();
221+
conn->write(err);
222+
}
223+
} else {
224+
conn->set_status(server::connection::bad_request);
225+
conn->set_headers(headers);
226+
conn->write("Only path allowed is /upload.");
227+
}
228+
}
229+
};
230+
231+
int main(int ac, const char *av[])
232+
{
233+
if (ac != 2) {
234+
std::cerr << "Usage: " << av[0] << " <listener-port>" << std::endl;
235+
return EXIT_SUCCESS;
236+
}
237+
238+
// Create a connection handler
239+
connection_handler handler;
240+
241+
// Setup the async server
242+
server local_server(server::options(handler)
243+
.address("0.0.0.0")
244+
.port(av[1])
245+
.reuse_address(true)
246+
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(2)));
247+
248+
// Start the server eventloop
249+
local_server.run();
250+
251+
return EXIT_SUCCESS;
252+
}

0 commit comments

Comments
 (0)