forked from ververica/ForSt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsequence_file_reader.cc
320 lines (294 loc) · 11.7 KB
/
sequence_file_reader.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/sequence_file_reader.h"
#include <algorithm>
#include <mutex>
#include "file/read_write_util.h"
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
#include "util/random.h"
#include "util/rate_limiter_impl.h"
namespace ROCKSDB_NAMESPACE {
IOStatus SequentialFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg, RateLimiter* rate_limiter) {
std::unique_ptr<FSSequentialFile> file;
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {},
rate_limiter));
}
return io_s;
}
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
Env::IOPriority rate_limiter_priority) {
IOStatus io_s;
if (use_direct_io()) {
//
// |-offset_advance-|---bytes returned--|
// |----------------------buf size-------------------------|
// | | | |
// aligned offset offset + n Roundup(offset + n,
// offset alignment)
//
size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
size_t r = 0;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
while (buf.CurrentSize() < size) {
size_t allowed;
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
rate_limiter_priority, nullptr /* stats */,
RateLimiter::OpType::kRead);
} else {
assert(buf.CurrentSize() == 0);
allowed = size;
}
Slice tmp;
uint64_t orig_offset = 0;
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
orig_offset = aligned_offset + buf.CurrentSize();
start_ts = FileOperationInfo::StartNow();
}
io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
IOOptions(), &tmp, buf.Destination(),
nullptr /* dbg */);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
io_s);
}
buf.Size(buf.CurrentSize() + tmp.size());
if (!io_s.ok() || tmp.size() < allowed) {
break;
}
}
if (io_s.ok() && offset_advance < buf.CurrentSize()) {
r = buf.Read(scratch, offset_advance,
std::min(buf.CurrentSize() - offset_advance, n));
}
*result = Slice(scratch, r);
} else {
// To be paranoid, modify scratch a little bit, so in case underlying
// FileSystem doesn't fill the buffer but return success and `scratch`
// returns contains a previous block, returned value will not pass
// checksum.
// It's hard to find useful byte for direct I/O case, so we skip it.
if (n > 0 && scratch != nullptr) {
scratch[0]++;
}
size_t read = 0;
while (read < n) {
size_t allowed;
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
n - read, 0 /* alignment */, rate_limiter_priority,
nullptr /* stats */, RateLimiter::OpType::kRead);
} else {
allowed = n;
}
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
Slice tmp;
io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read,
nullptr /* dbg */);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
size_t offset = offset_.fetch_add(tmp.size());
NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s);
}
read += tmp.size();
if (!io_s.ok() || tmp.size() < allowed) {
break;
}
}
*result = Slice(scratch, read);
}
IOSTATS_ADD(bytes_read, result->size());
return io_s;
}
IOStatus SequentialFileReader::Skip(uint64_t n) {
if (use_direct_io()) {
offset_ += static_cast<size_t>(n);
return IOStatus::OK();
}
return file_->Skip(n);
}
namespace {
// This class wraps a SequentialFile, exposing same API, with the differenece
// of being able to prefetch up to readahead_size bytes and then serve them
// from memory, avoiding the entire round-trip if, for example, the data for the
// file is actually remote.
class ReadaheadSequentialFile : public FSSequentialFile {
public:
ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
buffer_(),
buffer_offset_(0),
read_offset_(0) {
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_);
}
ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
IODebugContext* dbg) override {
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
// Check if there is a cache hit, meaning that [offset, offset + n) is
// either completely or partially in the buffer. If it's completely cached,
// including end of file case when offset + n is greater than EOF, then
// return.
if (TryReadFromCache(n, &cached_len, scratch) &&
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return IOStatus::OK();
}
n -= cached_len;
IOStatus s;
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
s = file_->Read(n, opts, result, scratch + cached_len, dbg);
if (s.ok()) {
read_offset_ += result->size();
*result = Slice(scratch, cached_len + result->size());
}
buffer_.Clear();
return s;
}
s = ReadIntoBuffer(readahead_size_, opts, dbg);
if (s.ok()) {
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
TryReadFromCache(n, &remaining_len, scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
}
return s;
}
IOStatus Skip(uint64_t n) override {
std::unique_lock<std::mutex> lk(lock_);
IOStatus s = IOStatus::OK();
// First check if we need to skip already cached data
if (buffer_.CurrentSize() > 0) {
// Do we need to skip beyond cached data?
if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
// Yes. Skip whaterver is in memory and adjust offset accordingly
n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
read_offset_ = buffer_offset_ + buffer_.CurrentSize();
} else {
// No. The entire section to be skipped is entirely i cache.
read_offset_ += n;
n = 0;
}
}
if (n > 0) {
// We still need to skip more, so call the file API for skipping
s = file_->Skip(n);
if (s.ok()) {
read_offset_ += n;
}
buffer_.Clear();
}
return s;
}
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
Slice* result, char* scratch,
IODebugContext* dbg) override {
return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
private:
// Tries to read from buffer_ n bytes. If anything was read from the cache, it
// sets cached_len to the number of bytes actually read, copies these number
// of bytes to scratch and returns true.
// If nothing was read sets cached_len to 0 and returns false.
bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
if (read_offset_ < buffer_offset_ ||
read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
*cached_len = std::min(
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
read_offset_ += *cached_len;
return true;
}
// Reads into buffer_ the next n bytes from file_.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
IODebugContext* dbg) {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(n, alignment_));
Slice result;
IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
if (s.ok()) {
buffer_offset_ = read_offset_;
buffer_.Size(result.size());
assert(result.size() == 0 || buffer_.BufferStart() == result.data());
}
return s;
}
const std::unique_ptr<FSSequentialFile> file_;
const size_t alignment_;
const size_t readahead_size_;
std::mutex lock_;
// The buffer storing the prefetched data
AlignedBuffer buffer_;
// The offset in file_, corresponding to data stored in buffer_
uint64_t buffer_offset_;
// The offset up to which data was read from file_. In fact, it can be larger
// than the actual file size, since the file_->Skip(n) call doesn't return the
// actual number of bytes that were skipped, which can be less than n.
// This is not a problemm since read_offset_ is monotonically increasing and
// its only use is to figure out if next piece of data should be read from
// buffer_ or file_ directly.
uint64_t read_offset_;
};
} // namespace
std::unique_ptr<FSSequentialFile>
SequentialFileReader::NewReadaheadSequentialFile(
std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
if (file->GetRequiredBufferAlignment() >= readahead_size) {
// Short-circuit and return the original file if readahead_size is
// too small and hence doesn't make sense to be used for prefetching.
return std::move(file);
}
std::unique_ptr<FSSequentialFile> result(
new ReadaheadSequentialFile(std::move(file), readahead_size));
return result;
}
} // namespace ROCKSDB_NAMESPACE