Skip to content

Commit

Permalink
xprtrdma: Fix client lock-up after application signal fires
Browse files Browse the repository at this point in the history
After a signal, the RPC client aborts synchronous RPCs running on
behalf of the signaled application.

The server is still executing those RPCs, and will write the results
back into the client's memory when it's done. By the time the server
writes the results, that memory is likely being used for other
purposes. Therefore xprtrdma has to immediately invalidate all
memory regions used by those aborted RPCs to prevent the server's
writes from clobbering that re-used memory.

With FMR memory registration, invalidation takes a relatively long
time. In fact, the invalidation is often still running when the
server tries to write the results into the memory regions that are
being invalidated.

This sets up a race between two processes:

1.  After the signal, xprt_rdma_free calls ro_unmap_safe.
2.  While ro_unmap_safe is still running, the server replies and
    rpcrdma_reply_handler runs, calling ro_unmap_sync.

Both processes invoke ib_unmap_fmr on the same FMR.

The mlx4 driver allows two ib_unmap_fmr calls on the same FMR at
the same time, but HCAs generally don't tolerate this. Sometimes
this can result in a system crash.

If the HCA happens to survive, rpcrdma_reply_handler continues. It
removes the rpc_rqst from rq_list and releases the transport_lock.
This enables xprt_rdma_free to run in another process, and the
rpc_rqst is released while rpcrdma_reply_handler is still waiting
for the ib_unmap_fmr call to finish.

But further down in rpcrdma_reply_handler, the transport_lock is
taken again, and "rqst" is dereferenced. If "rqst" has already been
released, this triggers a general protection fault. Since bottom-
halves are disabled, the system locks up.

Address both issues by reversing the order of the xprt_lookup_rqst
call and the ro_unmap_sync call. Introduce a separate lookup
mechanism for rpcrdma_req's to enable calling ro_unmap_sync before
xprt_lookup_rqst. Now the handler takes the transport_lock once
and holds it for the XID lookup and RPC completion.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 6879164 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <[email protected]>
Signed-off-by: Anna Schumaker <[email protected]>
  • Loading branch information
chucklever authored and amschuma-ntap committed Jul 13, 2017
1 parent a80d66c commit 431af64
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 31 deletions.
79 changes: 50 additions & 29 deletions net/sunrpc/xprtrdma/rpc_rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
rpclen = 0;
}

req->rl_xid = rqst->rq_xid;
rpcrdma_insert_req(&r_xprt->rx_buf, req);

/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
*
Expand Down Expand Up @@ -987,11 +990,12 @@ rpcrdma_reply_handler(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status, rmerr;
unsigned long cwnd;
Expand All @@ -1013,28 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst)
spin_lock(&buf->rb_lock);
req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
headerp->rm_xid);
if (!req)
goto out_nomatch;

req = rpcr_to_rdmar(rqst);
if (req->rl_reply)
goto out_duplicate;

/* Sanity checking has passed. We are now committed
* to complete this transaction.
*/
list_replace_init(&req->rl_registered, &mws);
rpcrdma_mark_remote_invalidation(&mws, rep);
list_del_init(&rqst->rq_list);

/* Avoid races with signals and duplicate replies
* by marking this req as matched.
*/
req->rl_reply = rep;
spin_unlock_bh(&xprt->transport_lock);
spin_unlock(&buf->rb_lock);

dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));

xprt->reestablish_timeout = 0;
/* Invalidate and unmap the data payloads before waking the
* waiting application. This guarantees the memory regions
* are properly fenced from the server before the application
* accesses the data. It also ensures proper send flow control:
* waking the next RPC waits until this RPC has relinquished
* all its Send Queue entries.
*/
if (!list_empty(&mws))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);

/* Perform XID lookup, reconstruction of the RPC reply, and
* RPC completion while holding the transport lock to ensure
* the rep, rqst, and rq_task pointers remain stable.
*/
spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst)
goto out_norqst;
xprt->reestablish_timeout = 0;
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;

Expand Down Expand Up @@ -1109,17 +1130,6 @@ rpcrdma_reply_handler(struct work_struct *work)
}

out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
* accesses the data. It also ensures proper send flow
* control: waking the next RPC waits until this RPC has
* relinquished all its Send Queue entries.
*/
if (!list_empty(&mws))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);

spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
Expand All @@ -1128,7 +1138,7 @@ rpcrdma_reply_handler(struct work_struct *work)
xprt_complete_rqst(rqst->rq_task, status);
spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
__func__, xprt, rqst, status);
return;

out_badstatus:
Expand Down Expand Up @@ -1177,26 +1187,37 @@ rpcrdma_reply_handler(struct work_struct *work)
r_xprt->rx_stats.bad_reply_count++;
goto out;

/* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning.
/* The req was still available, but by the time the transport_lock
* was acquired, the rqst and task had been released. Thus the RPC
* has already been terminated.
*/
out_norqst:
spin_unlock_bh(&xprt->transport_lock);
rpcrdma_buffer_put(req);
dprintk("RPC: %s: race, no rqst left for req %p\n",
__func__, req);
return;

out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;

out_nomatch:
spin_unlock_bh(&xprt->transport_lock);
spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
__func__, be32_to_cpu(headerp->rm_xid),
rep->rr_len);
goto repost;

out_duplicate:
spin_unlock_bh(&xprt->transport_lock);
spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: "
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));

/* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning.
*/
repost:
r_xprt->rx_stats.bad_reply_count++;
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
Expand Down
3 changes: 2 additions & 1 deletion net/sunrpc/xprtrdma/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ xprt_rdma_free(struct rpc_task *task)

dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

if (unlikely(!list_empty(&req->rl_registered)))
rpcrdma_remove_req(&r_xprt->rx_buf, req);
if (!list_empty(&req->rl_registered))
ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_unmap_sges(ia, req);
rpcrdma_buffer_put(req);
Expand Down
3 changes: 2 additions & 1 deletion net/sunrpc/xprtrdma/verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
spin_lock_init(&buf->rb_recovery_lock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
INIT_LIST_HEAD(&buf->rb_pending);
INIT_LIST_HEAD(&buf->rb_stale_mrs);
INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker);
Expand Down Expand Up @@ -1084,7 +1085,7 @@ rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)

req = list_first_entry(&buf->rb_send_bufs,
struct rpcrdma_req, rl_list);
list_del(&req->rl_list);
list_del_init(&req->rl_list);
return req;
}

Expand Down
30 changes: 30 additions & 0 deletions net/sunrpc/xprtrdma/xprt_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ enum {
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_list;
__be32 rl_xid;
unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
Expand Down Expand Up @@ -402,6 +403,7 @@ struct rpcrdma_buffer {
int rb_send_count, rb_recv_count;
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
struct list_head rb_pending;
u32 rb_max_requests;
atomic_t rb_credits; /* most recent credit grant */

Expand Down Expand Up @@ -550,6 +552,34 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);

static inline void
rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
spin_lock(&buffers->rb_lock);
if (list_empty(&req->rl_list))
list_add_tail(&req->rl_list, &buffers->rb_pending);
spin_unlock(&buffers->rb_lock);
}

static inline struct rpcrdma_req *
rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
{
struct rpcrdma_req *pos;

list_for_each_entry(pos, &buffers->rb_pending, rl_list)
if (pos->rl_xid == xid)
return pos;
return NULL;
}

static inline void
rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
spin_lock(&buffers->rb_lock);
list_del(&req->rl_list);
spin_unlock(&buffers->rb_lock);
}

struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
Expand Down

0 comments on commit 431af64

Please sign in to comment.