-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathfprefetch.rs
126 lines (114 loc) · 3.97 KB
/
fprefetch.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// fprefetcher is a file opening queue used by the put command.
// The idea is there is a queue of files you are interested
// in reading in the near future and it lets the OS know the
// intention via whatever readahead mechanism your OS provides.
use std::collections::VecDeque;
use std::fs::File;
use std::path::PathBuf;
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::os::unix::fs::OpenOptionsExt;
}
}
cfg_if::cfg_if! {
if #[cfg(target_os = "macos")] {
// Nothing is needed.
} else if #[cfg(target_os = "openbsd")] {
// Nothing is needed.
} else {
use std::os::unix::io::AsRawFd;
const NUM_PREFETCHED_BYTES: libc::off_t = 128 * 1024 * 1024;
}
}
const NUM_PREOPENED_FILES: usize = 1;
#[derive(Default)]
pub struct ReadaheadFileOpener {
unopened: VecDeque<PathBuf>,
opened: VecDeque<(PathBuf, std::io::Result<File>)>,
}
fn open_file_for_streaming(fpath: &std::path::Path) -> std::io::Result<File> {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
// Try with O_NOATIME first; if it fails, e.g. because the user we
// run as is not the file owner, retry without..
let f = std::fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_NOATIME)
.open(fpath)
.or_else(|error| {
match error.kind() {
std::io::ErrorKind::PermissionDenied => {
std::fs::OpenOptions::new()
.read(true)
.open(fpath)
}
_ => Err(error)
}
})?;
} else {
let f = std::fs::OpenOptions::new()
.read(true)
.open(fpath)?;
}
}
cfg_if::cfg_if! {
if #[cfg(target_os = "macos")] {
// XXX can we do anything here?
// Perhaps F_RDADVISE ?
} else if #[cfg(target_os = "openbsd")] {
// XXX can we do anything here?
} else {
// We would like to use something like POSIX_FADV_NOREUSE to preserve
// the user page cache... this is actually a NOOP on linux.
// Instead we can at least boost performance by hinting our access pattern.
match nix::fcntl::posix_fadvise(
f.as_raw_fd(),
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_SEQUENTIAL,
) {
Ok(_) => (),
Err(err) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("fadvise POSIX_FADV_SEQUENTIAL failed: {}", err),
))
}
};
match nix::fcntl::posix_fadvise(
f.as_raw_fd(),
0,
NUM_PREFETCHED_BYTES,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_WILLNEED,
) {
Ok(_) => (),
Err(err) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("fadvise POSIX_FADV_WILLNEED failed: {}", err),
))
}
};
}
}
Ok(f)
}
impl ReadaheadFileOpener {
pub fn new() -> ReadaheadFileOpener {
ReadaheadFileOpener {
unopened: VecDeque::new(),
opened: VecDeque::new(),
}
}
pub fn add_to_queue(&mut self, p: PathBuf) {
self.unopened.push_back(p);
}
pub fn next_file(&mut self) -> Option<(PathBuf, std::io::Result<File>)> {
while !self.unopened.is_empty() && self.opened.len() < NUM_PREOPENED_FILES + 1 {
let p = self.unopened.pop_front().unwrap();
let r = open_file_for_streaming(&p);
self.opened.push_back((p, r))
}
self.opened.pop_front()
}
}