Skip to content

Commit

Permalink
ceph: refactor ceph_sync_read()
Browse files Browse the repository at this point in the history
Avoid allocating memory for the entire user request: striped_read()
does a synchronous OSD request per object, so it doesn't need more than
object size worth of pages at a time.

[ Preserve the comment, changelog. ]

Signed-off-by: "Yan, Zheng" <[email protected]>
Signed-off-by: Ilya Dryomov <[email protected]>
  • Loading branch information
ukernel authored and idryomov committed Oct 22, 2018
1 parent 74c9e6b commit fce7a97
Showing 1 changed file with 106 additions and 113 deletions.
219 changes: 106 additions & 113 deletions fs/ceph/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,91 +556,27 @@ enum {
READ_INLINE = 3,
};

/*
* Read a range of bytes striped over one or more objects. Iterate over
* objects we stripe over. (That's not atomic, but good enough for now.)
*
* If we get a short result from the OSD, check against i_size; we need to
* only return a short read to the caller if we hit EOF.
*/
static int striped_read(struct inode *inode,
u64 pos, u64 len,
struct page **pages, int num_pages,
int page_align, int *checkeof)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
u64 this_len;
loff_t i_size;
int page_idx;
int ret, read = 0;
bool hit_stripe, was_short;

/*
* we may need to do multiple reads. not atomic, unfortunately.
*/
more:
this_len = len;
page_idx = (page_align + read) >> PAGE_SHIFT;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len,
ci->i_truncate_seq, ci->i_truncate_size,
pages + page_idx, num_pages - page_idx,
((page_align + read) & ~PAGE_MASK));
if (ret == -ENOENT)
ret = 0;
hit_stripe = this_len < len;
was_short = ret >= 0 && ret < this_len;
dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");

i_size = i_size_read(inode);
if (ret >= 0) {
if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, i_size - pos - ret);
int zoff = page_align + read + ret;
dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}

read += ret;
pos += ret;
len -= ret;

/* hit stripe and need continue*/
if (len && hit_stripe && pos < i_size)
goto more;
}

if (read > 0) {
ret = read;
/* did we bounce off eof? */
if (pos + len > i_size)
*checkeof = CHECK_EOF;
}

dout("striped_read returns %d\n", ret);
return ret;
}

/*
* Completely synchronous read and write methods. Direct from __user
* buffer to osd, or directly to user pages (if O_DIRECT).
*
* If the read spans object boundary, just do multiple reads.
* If the read spans object boundary, just do multiple reads. (That's not
* atomic, but good enough for now.)
*
* If we get a short result from the OSD, check against i_size; we need to
* only return a short read to the caller if we hit EOF.
*/
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
int *checkeof)
int *retry_op)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct page **pages;
u64 off = iocb->ki_pos;
int num_pages;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_client *osdc = &fsc->client->osdc;
ssize_t ret;
size_t len = iov_iter_count(to);
u64 off = iocb->ki_pos;
u64 len = iov_iter_count(to);

dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
Expand All @@ -653,61 +589,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
* but it will at least behave sensibly when they are
* in sequence.
*/
ret = filemap_write_and_wait_range(inode->i_mapping, off,
off + len);
ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
if (ret < 0)
return ret;

if (unlikely(to->type & ITER_PIPE)) {
ret = 0;
while ((len = iov_iter_count(to)) > 0) {
struct ceph_osd_request *req;
struct page **pages;
int num_pages;
size_t page_off;
ret = iov_iter_get_pages_alloc(to, &pages, len,
&page_off);
if (ret <= 0)
return -ENOMEM;
num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
u64 i_size;
bool more;

req = ceph_osdc_new_request(osdc, &ci->i_layout,
ci->i_vino, off, &len, 0, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, ci->i_truncate_seq,
ci->i_truncate_size, false);
if (IS_ERR(req)) {
ret = PTR_ERR(req);
break;
}

more = len < iov_iter_count(to);

ret = striped_read(inode, off, ret, pages, num_pages,
page_off, checkeof);
if (ret > 0) {
iov_iter_advance(to, ret);
off += ret;
if (unlikely(to->type & ITER_PIPE)) {
ret = iov_iter_get_pages_alloc(to, &pages, len,
&page_off);
if (ret <= 0) {
ceph_osdc_put_request(req);
ret = -ENOMEM;
break;
}
num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
if (ret < len) {
len = ret;
osd_req_op_extent_update(req, 0, len);
more = false;
}
} else {
iov_iter_advance(to, 0);
num_pages = calc_pages_for(off, len);
page_off = off & ~PAGE_MASK;
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages)) {
ceph_osdc_put_request(req);
ret = PTR_ERR(pages);
break;
}
}
ceph_put_page_vector(pages, num_pages, false);
} else {
num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);

ret = striped_read(inode, off, len, pages, num_pages,
(off & ~PAGE_MASK), checkeof);
if (ret > 0) {
int l, k = 0;
size_t left = ret;

while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off,
copy, to);
off += l;
left -= l;
if (l < copy)

osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
false, false);
ret = ceph_osdc_start_request(osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(osdc, req);
ceph_osdc_put_request(req);

i_size = i_size_read(inode);
dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
off, len, ret, i_size, (more ? " MORE" : ""));

if (ret == -ENOENT)
ret = 0;
if (ret >= 0 && ret < len && (off + ret < i_size)) {
int zlen = min(len - ret, i_size - off - ret);
int zoff = page_off + ret;
dout("sync_read zero gap %llu~%llu\n",
off + ret, off + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}

if (unlikely(to->type & ITER_PIPE)) {
if (ret > 0) {
iov_iter_advance(to, ret);
off += ret;
} else {
iov_iter_advance(to, 0);
}
ceph_put_page_vector(pages, num_pages, false);
} else {
int idx = 0;
size_t left = ret > 0 ? ret : 0;
while (left > 0) {
size_t len, copied;
page_off = off & ~PAGE_MASK;
len = min_t(size_t, left, PAGE_SIZE - page_off);
copied = copy_page_to_iter(pages[idx++],
page_off, len, to);
off += copied;
left -= copied;
if (copied < len) {
ret = -EFAULT;
break;
}
}
ceph_release_page_vector(pages, num_pages);
}
ceph_release_page_vector(pages, num_pages);

if (ret <= 0 || off >= i_size || !more)
break;
}

if (off > iocb->ki_pos) {
if (ret >= 0 &&
iov_iter_count(to) > 0 && off >= i_size_read(inode))
*retry_op = CHECK_EOF;
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}

dout("sync_read result %zd\n", ret);
dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
return ret;
}

Expand Down

0 comments on commit fce7a97

Please sign in to comment.