Skip to content

Commit

Permalink
fstream: use file, not lw_shared_ptr<file>
Browse files Browse the repository at this point in the history
Since commit 2a62230, our class file is
copyable, using a shared_ptr internally. This means there is no point to
use lw_shared_ptr<file>, as just file could be used.

This patch fixes the fstream.hh APIs to work directly with file, instead
of lw_shared_ptr_file.

Signed-off-by: Nadav Har'El <[email protected]>
  • Loading branch information
nyh authored and avikivity committed Jul 20, 2015
1 parent 0f0ae16 commit 5442879
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/file.hh
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public:
}

/// Gets the file size.
future<uint64_t> size() {
future<uint64_t> size() const {
return _file_impl->size();
}

Expand Down
30 changes: 15 additions & 15 deletions core/fstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
#include <string.h>

class file_data_source_impl : public data_source_impl {
lw_shared_ptr<file> _file;
file _file;
uint64_t _pos;
size_t _buffer_size;
public:
file_data_source_impl(lw_shared_ptr<file> f, uint64_t pos, size_t buffer_size)
file_data_source_impl(file f, uint64_t pos, size_t buffer_size)
: _file(std::move(f)), _pos(pos), _buffer_size(buffer_size) {}
virtual future<temporary_buffer<char>> get() override {
using buf_type = temporary_buffer<char>;
return _file->dma_read_bulk<char>(_pos, _buffer_size).then(
return _file.dma_read_bulk<char>(_pos, _buffer_size).then(
[this] (buf_type buf) {
_pos += buf.size();
return std::move(buf);
Expand All @@ -43,23 +43,23 @@ class file_data_source_impl : public data_source_impl {

class file_data_source : public data_source {
public:
file_data_source(lw_shared_ptr<file> f, uint64_t offset, size_t buffer_size)
file_data_source(file f, uint64_t offset, size_t buffer_size)
: data_source(std::make_unique<file_data_source_impl>(
std::move(f), offset, buffer_size)) {}
};

input_stream<char> make_file_input_stream(
lw_shared_ptr<file> f, uint64_t offset, size_t buffer_size) {
file f, uint64_t offset, size_t buffer_size) {
return input_stream<char>(file_data_source(std::move(f), offset, buffer_size));
}

class file_data_sink_impl : public data_sink_impl {
lw_shared_ptr<file> _file;
file _file;
file_output_stream_options _options;
uint64_t _pos = 0;
uint64_t _last_preallocation = 0;
public:
file_data_sink_impl(lw_shared_ptr<file> f, file_output_stream_options options)
file_data_sink_impl(file f, file_output_stream_options options)
: _file(std::move(f)), _options(options) {}
future<> put(net::packet data) { abort(); }
virtual temporary_buffer<char> allocate_buffer(size_t size) override {
Expand Down Expand Up @@ -90,39 +90,39 @@ class file_data_sink_impl : public data_sink_impl {
if (pos + buf_size > _last_preallocation) {
auto old = _last_preallocation;
_last_preallocation = align_down<uint64_t>(old + _options.preallocation_size, _options.preallocation_size);
prealloc = _file->allocate(old, _last_preallocation - old);
prealloc = _file.allocate(old, _last_preallocation - old);
}
return prealloc.then([this, pos, p, buf_size, truncate, buf = std::move(buf)] () mutable {
return _file->dma_write(pos, p, buf_size).then(
return _file.dma_write(pos, p, buf_size).then(
[this, buf = std::move(buf), truncate] (size_t size) {
if (truncate) {
return _file->truncate(_pos);
return _file.truncate(_pos);
}
return make_ready_future<>();
});
});
}
future<> close() {
return _file->flush().then([this] {
return _file->close();
return _file.flush().then([this] {
return _file.close();
});
}
};

class file_data_sink : public data_sink {
public:
file_data_sink(lw_shared_ptr<file> f, file_output_stream_options options)
file_data_sink(file f, file_output_stream_options options)
: data_sink(std::make_unique<file_data_sink_impl>(
std::move(f), options)) {}
};

output_stream<char> make_file_output_stream(lw_shared_ptr<file> f, size_t buffer_size) {
output_stream<char> make_file_output_stream(file f, size_t buffer_size) {
file_output_stream_options options;
options.buffer_size = buffer_size;
return make_file_output_stream(std::move(f), options);
}

output_stream<char> make_file_output_stream(lw_shared_ptr<file> f, file_output_stream_options options) {
output_stream<char> make_file_output_stream(file f, file_output_stream_options options) {
return output_stream<char>(file_data_sink(std::move(f), options), options.buffer_size, true);
}

6 changes: 3 additions & 3 deletions core/fstream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
// given file. Multiple fibers of execution (continuations) may safely open
// multiple input streams concurrently for the same file.
input_stream<char> make_file_input_stream(
lw_shared_ptr<file> file, uint64_t offset = 0,
file file, uint64_t offset = 0,
uint64_t buffer_size = 8192);

struct file_output_stream_options {
Expand All @@ -48,13 +48,13 @@ struct file_output_stream_options {
// newly created file.
// NOTE: flush() should be the last thing to be called on a file output stream.
output_stream<char> make_file_output_stream(
lw_shared_ptr<file> file,
file file,
uint64_t buffer_size = 8192);

/// Create an output_stream for writing starting at the position zero of a
/// newly created file.
/// NOTE: flush() should be the last thing to be called on a file output stream.
output_stream<char> make_file_output_stream(
lw_shared_ptr<file> file,
file file,
file_output_stream_options options);

2 changes: 1 addition & 1 deletion http/file_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ sstring file_interaction_handler::get_extension(const sstring& file) {
struct reader {
reader(file f, std::unique_ptr<reply> rep)
: is(
make_file_input_stream(make_lw_shared<file>(std::move(f)),
make_file_input_stream(std::move(f),
0, 4096)), _rep(std::move(rep)) {
}
input_stream<char> is;
Expand Down
10 changes: 3 additions & 7 deletions tests/fstream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@

struct writer {
output_stream<char> out;
writer(file f) : out(make_file_output_stream(
make_lw_shared<file>(std::move(f)))) {}
writer(file f) : out(make_file_output_stream(std::move(f))) {}
};

struct reader {
input_stream<char> in;
reader(file f) : in(make_file_input_stream(
make_lw_shared<file>(std::move(f)))) {}
reader(file f) : in(make_file_input_stream(std::move(f))) {}
};

SEASTAR_TEST_CASE(test_fstream) {
Expand Down Expand Up @@ -138,15 +136,14 @@ SEASTAR_TEST_CASE(test_fstream_unaligned) {
future<> test_consume_until_end(uint64_t size) {
return open_file_dma("testfile.tmp",
open_flags::rw | open_flags::create | open_flags::truncate).then([size] (file f) {
return do_with(make_lw_shared(std::move(f)), [size] (lw_shared_ptr<file> f) {
return do_with(make_file_output_stream(f), [size] (output_stream<char>& out) {
std::vector<char> buf(size);
std::iota(buf.begin(), buf.end(), 0);
return out.write(buf.data(), buf.size()).then([&out] {
return out.flush();
});
}).then([f] {
return f->size();
return f.size();
}).then([size, f] (size_t real_size) {
BOOST_REQUIRE_EQUAL(size, real_size);
}).then([size, f] {
Expand All @@ -165,7 +162,6 @@ future<> test_consume_until_end(uint64_t size) {
return in.consume(consumer);
});
});
});
});
}

Expand Down
3 changes: 1 addition & 2 deletions tests/linecount.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
#include <algorithm>

struct reader {
reader(file f) : is(make_file_input_stream(
make_lw_shared<file>(std::move(f)), 0, 4096)) {}
reader(file f) : is(make_file_input_stream(std::move(f), 0, 4096)) {}
input_stream<char> is;
size_t count = 0;

Expand Down

0 comments on commit 5442879

Please sign in to comment.