Skip to content

Commit

Permalink
fstream: introduce safe make_file_output_stream() and make_file_data_…
Browse files Browse the repository at this point in the history
…sink()

Currently these functions are unsafe in that if they fail to construct
the output_stream or the data_sink respectively the `file` instance
passed in will be left unclosed, triggering an assertion failure in the
`~file()`. As these classes take ownership of the file after being
constructed, it makes sense for them to take care of the `file` object
from the moment it is passed in.

For this to be safe the functions have to return a future<>, so that
in the event of a failure callers can wait for the file to be closed.
The new safe variants are added to api_v3 namespace, which is also made
the default api level in this patch.
  • Loading branch information
denesb committed May 19, 2020
1 parent c5c24b6 commit 52e0a76
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 16 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ option (Seastar_SSTRING
ON)

set (Seastar_API_LEVEL
"2"
"3"
CACHE
STRING
"Seastar compatibility API level (2=server_socket::accept() returns accept_result)")
"Seastar compatibility API level (2=server_socket::accept() returns accept_result, 3=make_file_output_stream(), make_file_data_sink() returns future<...>)")

set_property (CACHE Seastar_API_LEVEL
PROPERTY
Expand Down
4 changes: 2 additions & 2 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def dialect_supported(dialect, compiler='g++'):
help = 'Extra flags for the linker')
arg_parser.add_argument('--optflags', action = 'store', dest = 'user_optflags', default = '',
help = 'Extra optimization flags for the release mode')
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='2',
help='Compatibility API level (2=latest)')
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='3',
help='Compatibility API level (3=latest)')
arg_parser.add_argument('--compiler', action = 'store', dest = 'cxx', default = 'g++',
help = 'C++ compiler path')
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='gcc',
Expand Down
33 changes: 33 additions & 0 deletions include/seastar/core/fstream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <seastar/core/file.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/internal/api-level.hh>

namespace seastar {

Expand Down Expand Up @@ -95,22 +96,54 @@ struct file_output_stream_options {
::seastar::io_priority_class io_priority_class = default_priority_class();
};

namespace api_v2 {

/// Create an output_stream for writing starting at the position zero of a
/// newly created file.
/// NOTE: flush() should be the last thing to be called on a file output stream.
[[deprecated("use Seastar_API_LEVEL=3 instead")]]
output_stream<char> make_file_output_stream(
file file,
uint64_t buffer_size = 8192);

/// Create an output_stream for writing starting at the position zero of a
/// newly created file.
/// NOTE: flush() should be the last thing to be called on a file output stream.
[[deprecated("use Seastar_API_LEVEL=3 instead")]]
output_stream<char> make_file_output_stream(
file file,
file_output_stream_options options);

/// Create a data_sink for writing starting at the position zero of a
/// newly created file.
[[deprecated("use Seastar_API_LEVEL=3 instead")]]
data_sink make_file_data_sink(file, file_output_stream_options);

}

namespace api_v3::and_newer {

/// Create an output_stream for writing starting at the position zero of a
/// newly created file.
/// NOTE: flush() should be the last thing to be called on a file output stream.
/// Closes the file if the stream creation fails.
future<output_stream<char>> make_file_output_stream(
file file,
uint64_t buffer_size = 8192) noexcept;

/// Create an output_stream for writing starting at the position zero of a
/// newly created file.
/// NOTE: flush() should be the last thing to be called on a file output stream.
/// Closes the file if the stream creation fails.
future<output_stream<char>> make_file_output_stream(
file file,
file_output_stream_options options) noexcept;

/// Create a data_sink for writing starting at the position zero of a
/// newly created file.
/// Closes the file if the sink creation fails.
future<data_sink> make_file_data_sink(file, file_output_stream_options) noexcept;

}

}
2 changes: 1 addition & 1 deletion include/seastar/core/internal/api-level.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

// For IDEs that don't see SEASTAR_API_LEVEL, generate a nice default
#ifndef SEASTAR_API_LEVEL
#define SEASTAR_API_LEVEL 2
#define SEASTAR_API_LEVEL 3
#endif

#if SEASTAR_API_LEVEL == 3
Expand Down
52 changes: 50 additions & 2 deletions src/core/fstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/reactor.hh>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <malloc.h>
#include <string.h>

Expand Down Expand Up @@ -431,18 +433,64 @@ class file_data_sink_impl : public data_sink_impl {
}
};

namespace api_v2 {

data_sink make_file_data_sink(file f, file_output_stream_options options) {
return data_sink(std::make_unique<file_data_sink_impl>(std::move(f), options));
}

output_stream<char> make_file_output_stream(file f, size_t buffer_size) {
file_output_stream_options options;
options.buffer_size = buffer_size;
return make_file_output_stream(std::move(f), options);
// Don't generate a deprecation warning for the unsafe functions calling each other.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return api_v2::make_file_output_stream(std::move(f), options);
#pragma GCC diagnostic pop
}

output_stream<char> make_file_output_stream(file f, file_output_stream_options options) {
return output_stream<char>(make_file_data_sink(std::move(f), options), options.buffer_size, true);
// Don't generate a deprecation warning for the unsafe functions calling each other.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options), options.buffer_size, true);
#pragma GCC diagnostic pop
}

}

namespace api_v3::and_newer {

future<data_sink> make_file_data_sink(file f, file_output_stream_options options) noexcept {
try {
return make_ready_future<data_sink>(std::make_unique<file_data_sink_impl>(f, options));
} catch (...) {
return f.close().then_wrapped([ex = std::current_exception(), f] (future<> fut) mutable {
if (fut.failed()) {
try {
std::rethrow_exception(std::move(ex));
} catch (...) {
std::throw_with_nested(std::runtime_error(fmt::format("While handling failed construction of data_sink, caught exception: {}",
fut.get_exception())));
}
}
return make_exception_future<data_sink>(std::move(ex));
});
}
}

future<output_stream<char>> make_file_output_stream(file f, size_t buffer_size) noexcept {
file_output_stream_options options;
options.buffer_size = buffer_size;
return api_v3::and_newer::make_file_output_stream(std::move(f), options);
}

future<output_stream<char>> make_file_output_stream(file f, file_output_stream_options options) noexcept {
return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([buffer_size = options.buffer_size] (data_sink&& ds) {
return output_stream<char>(std::move(ds), buffer_size, true);
});
}

}

/*
Expand Down
3 changes: 2 additions & 1 deletion tests/perf/fstream_perf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int main(int ac, char** av) {
foso.buffer_size = buffer_size;
foso.preallocation_size = 32 << 20;
foso.write_behind = concurrency;
auto os = make_file_output_stream(f, foso);
return api_v3::and_newer::make_file_output_stream(f, foso).then([=] (output_stream<char>&& os) {
return do_with(std::move(os), std::move(f), unsigned(0), [=] (output_stream<char>& os, file& f, unsigned& completed) {
auto start = std::chrono::steady_clock::now();
return repeat([=, &os, &completed] {
Expand All @@ -75,6 +75,7 @@ int main(int ac, char** av) {
return os.close();
});
});
});
});
});
}
4 changes: 2 additions & 2 deletions tests/unit/fsnotifier_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ SEASTAR_THREAD_TEST_CASE(test_notify_modify_close_delete) {
| fsnotifier::flags::close
).get0();

auto os = make_file_output_stream(f);
auto os = api_v3::and_newer::make_file_output_stream(f).get0();
os.write("kossa").get();
os.flush().get();

Expand Down Expand Up @@ -88,7 +88,7 @@ SEASTAR_THREAD_TEST_CASE(test_notify_overwrite) {

auto write_file = [](fs::path& p, sstring content) {
auto f = open_file_dma(p.native(), open_flags::create|open_flags::rw).get0();
auto os = make_file_output_stream(f);
auto os = api_v3::and_newer::make_file_output_stream(f).get0();
os.write(content).get();
os.flush().get();
os.close().get();
Expand Down
20 changes: 14 additions & 6 deletions tests/unit/fstream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ namespace fs = compat::filesystem;

struct writer {
output_stream<char> out;
writer(file f) : out(make_file_output_stream(std::move(f))) {}
static future<shared_ptr<writer>> make(file f) {
return api_v3::and_newer::make_file_output_stream(std::move(f)).then([] (output_stream<char>&& os) {
return make_shared<writer>(writer{std::move(os)});
});
}
};

struct reader {
Expand All @@ -60,7 +64,7 @@ SEASTAR_TEST_CASE(test_fstream) {
auto filename = (t.get_path() / "testfile.tmp").native();
return open_file_dma(filename,
open_flags::rw | open_flags::create | open_flags::truncate).then([filename] (file f) {
auto w = make_shared<writer>(std::move(f));
return writer::make(std::move(f)).then([filename] (shared_ptr<writer> w) {
auto buf = static_cast<char*>(::malloc(4096));
memset(buf, 0, 4096);
buf[0] = '[';
Expand Down Expand Up @@ -103,6 +107,7 @@ SEASTAR_TEST_CASE(test_fstream) {
return r->in.close();
}).finally([r] {});
});
});
});
});
}
Expand All @@ -112,7 +117,7 @@ SEASTAR_TEST_CASE(test_consume_skip_bytes) {
auto filename = (t.get_path() / "testfile.tmp").native();
auto f = open_file_dma(filename,
open_flags::rw | open_flags::create | open_flags::truncate).get0();
auto w = make_lw_shared<writer>(std::move(f));
auto w = writer::make(std::move(f)).get0();
auto write_block = [w] (char c, size_t size) {
std::vector<char> vec(size, c);
w->out.write(&vec.front(), vec.size()).get();
Expand Down Expand Up @@ -182,7 +187,7 @@ SEASTAR_TEST_CASE(test_fstream_unaligned) {
auto filename = (t.get_path() / "testfile.tmp").native();
return open_file_dma(filename,
open_flags::rw | open_flags::create | open_flags::truncate).then([filename] (file f) {
auto w = make_shared<writer>(std::move(f));
return writer::make(std::move(f)).then([filename] (shared_ptr<writer> w) {
auto buf = static_cast<char*>(::malloc(40));
memset(buf, 0, 40);
buf[0] = '[';
Expand Down Expand Up @@ -213,6 +218,7 @@ SEASTAR_TEST_CASE(test_fstream_unaligned) {
return r->in.close();
}).finally([r] {});
});
});
});
});
}
Expand All @@ -222,12 +228,14 @@ future<> test_consume_until_end(uint64_t size) {
auto filename = (t.get_path() / "testfile.tmp").native();
return open_file_dma(filename,
open_flags::rw | open_flags::create | open_flags::truncate).then([size] (file f) {
return do_with(make_file_output_stream(f), [size] (output_stream<char>& out) {
return api_v3::and_newer::make_file_output_stream(f).then([size] (output_stream<char>&& os) {
return do_with(std::move(os), [size] (output_stream<char>& out) {
std::vector<char> buf(size);
std::iota(buf.begin(), buf.end(), 0);
return out.write(buf.data(), buf.size()).then([&out] {
return out.flush();
});
});
}).then([f] {
return f.size();
}).then([size, f] (size_t real_size) {
Expand Down Expand Up @@ -284,7 +292,7 @@ SEASTAR_TEST_CASE(test_input_stream_esp_around_eof) {
auto filename = (t.get_path() / "testfile.tmp").native();
auto f = open_file_dma(filename,
open_flags::rw | open_flags::create | open_flags::truncate).get0();
auto out = make_file_output_stream(f);
auto out = api_v3::and_newer::make_file_output_stream(f).get0();
out.write(reinterpret_cast<const char*>(data.data()), data.size()).get();
out.flush().get();
//out.close().get(); // FIXME: closes underlying stream:?!
Expand Down

0 comments on commit 52e0a76

Please sign in to comment.