Skip to content

Commit

Permalink
crimson/os/seastore: misc cleanup and reformat
Browse files Browse the repository at this point in the history
Signed-off-by: Yingxin Cheng <[email protected]>
  • Loading branch information
cyx1231st committed Dec 9, 2021
1 parent 2cd753a commit 2241111
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 162 deletions.
223 changes: 115 additions & 108 deletions src/crimson/os/seastore/extent_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,39 +72,40 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
found_record_handler_t(
[extents, this](
paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) mutable {
found_record_handler_t([extents, this](
paddr_t base,
const record_header_t& header,
const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
{
auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
if (!maybe_record_extent_infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"ExtentReader::scan_extents: unable to decode extents for record {}",
base);
return crimson::ct_error::input_output_error::make();
}

auto infos = try_decode_extent_infos(
header,
mdbuf);
if (!infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"ExtentReader::scan_extents unable to decode extents for record {}",
base);
assert(infos);
}

paddr_t extent_offset = base.add_offset(header.mdlength);
for (const auto &i : *infos) {
extents->emplace_back(extent_offset, i);
auto& seg_addr = extent_offset.as_seg_paddr();
seg_addr.set_segment_off(
seg_addr.get_segment_off() + i.len);
}
return scan_extents_ertr::now();
}),
paddr_t extent_offset = base.add_offset(header.mdlength);
logger().debug("ExtentReader::scan_extents: decoded {} extents",
maybe_record_extent_infos->size());
for (const auto &i : *maybe_record_extent_infos) {
extents->emplace_back(extent_offset, i);
auto& seg_addr = extent_offset.as_seg_paddr();
seg_addr.set_segment_off(
seg_addr.get_segment_off() + i.len);
}
return scan_extents_ertr::now();
}),
[=, &cursor](auto &dhandler) {
return scan_valid_records(
cursor,
segment_nonce,
bytes_to_read,
dhandler).discard_result();
});
return scan_valid_records(
cursor,
segment_nonce,
bytes_to_read,
dhandler
).discard_result();
}
);
}).safe_then([ret=std::move(ret)] {
return std::move(*ret);
});
Expand Down Expand Up @@ -140,7 +141,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
cursor.last_valid_header_found = true;
return scan_valid_records_ertr::now();
} else {
auto new_committed_to = md->first.committed_to;
auto& [header, md_bl] = *md;
auto new_committed_to = header.committed_to;
logger().debug(
"ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
cursor.seq,
Expand All @@ -150,9 +152,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
cursor.last_committed = new_committed_to;
cursor.pending_records.emplace_back(
cursor.seq.offset,
md->first,
md->second);
cursor.increment(md->first.dlength + md->first.mdlength);
header,
std::move(md_bl));
cursor.increment(header.dlength + header.mdlength);
ceph_assert(new_committed_to == journal_seq_t() ||
new_committed_to < cursor.seq);
return scan_valid_records_ertr::now();
Expand All @@ -178,14 +180,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return consume_next_records(cursor, handler, budget_used
).safe_then([] {
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
Expand All @@ -195,21 +191,12 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
assert(!cursor.pending_records.empty());
auto &next = cursor.pending_records.front();
return read_validate_data(next.offset, next.header
).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
if (!valid) {
cursor.pending_records.clear();
return scan_valid_records_ertr::now();
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return scan_valid_records_ertr::now();
});
return consume_next_records(cursor, handler, budget_used);
});
}
}().safe_then([=, &budget_used, &cursor] {
Expand Down Expand Up @@ -244,61 +231,63 @@ ExtentReader::read_validate_record_metadata(
logger().debug("read_validate_record_metadata: reading header block {}...",
start);
return segment_manager.read(start, block_size
).safe_then(
[=, &segment_manager](bufferptr bptr) mutable
-> read_validate_record_metadata_ret {
auto block_size = segment_manager.get_block_size();
bufferlist bl;
bl.append(bptr);
auto bp = bl.cbegin();
record_header_t header;
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.segment_nonce != nonce) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
auto& seg_addr = start.as_seg_paddr();
if (header.mdlength > (extent_len_t)block_size) {
if (seg_addr.get_segment_off() + header.mdlength >
(int64_t)segment_manager.get_segment_size()) {
return crimson::ct_error::input_output_error::make();
}
return segment_manager.read(
paddr_t::make_seg_paddr(seg_addr.get_segment_id(),
seg_addr.get_segment_off() + (segment_off_t)block_size),
header.mdlength - block_size).safe_then(
[header=std::move(header), bl=std::move(bl)](
auto &&bptail) mutable {
bl.push_back(bptail);
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl))
);
}
}).safe_then([=](auto p) {
if (p && validate_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
);
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
).safe_then([=, &segment_manager](bufferptr bptr) mutable
-> read_validate_record_metadata_ret {
auto block_size = static_cast<extent_len_t>(
segment_manager.get_block_size());
bufferlist bl;
bl.append(bptr);
auto bp = bl.cbegin();
record_header_t header;
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.segment_nonce != nonce) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
auto& seg_addr = start.as_seg_paddr();
if (seg_addr.get_segment_off() + header.mdlength >
(int64_t)segment_manager.get_segment_size()) {
logger().error("read_validate_record_metadata: failed, invalid header");
return crimson::ct_error::input_output_error::make();
}
if (header.mdlength == block_size) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl))
);
}
return segment_manager.read(
paddr_t::make_seg_paddr(
seg_addr.get_segment_id(),
seg_addr.get_segment_off() + (segment_off_t)block_size
),
header.mdlength - block_size
).safe_then([header=std::move(header), bl=std::move(bl)
](auto&& bptail) mutable {
bl.push_back(bptail);
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
}).safe_then([this](auto p) {
if (p && validate_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
);
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
});
}

std::optional<std::vector<extent_info_t>>
Expand Down Expand Up @@ -355,4 +344,22 @@ bool ExtentReader::validate_metadata(const bufferlist &bl)
return test_crc == recorded_crc;
}

ExtentReader::consume_record_group_ertr::future<>
ExtentReader::consume_next_records(
scan_valid_records_cursor& cursor,
found_record_handler_t& handler,
std::size_t& budget_used)
{
auto& next = cursor.pending_records.front();
auto total_length = next.header.dlength + next.header.mdlength;
budget_used += total_length;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
});
}

} // namespace crimson::os::seastore
7 changes: 7 additions & 0 deletions src/crimson/os/seastore/extent_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ class ExtentReader {
/// future resolution
);

using consume_record_group_ertr = scan_valid_records_ertr;
consume_record_group_ertr::future<> consume_next_records(
scan_valid_records_cursor& cursor,
found_record_handler_t& handler,
std::size_t& budget_used);


/// validate embedded metadata checksum
static bool validate_metadata(const bufferlist &bl);

Expand Down
Loading

0 comments on commit 2241111

Please sign in to comment.