Skip to content

Commit

Permalink
md-cluster: Protect communication with mutexes
Browse files Browse the repository at this point in the history
Communication can happen through multiple threads. It is possible that
one thread steps over another threads sequence. So, we use mutexes to
protect both the send and receive sequences.

Send communication is locked through state bit, MD_CLUSTER_SEND_LOCK.
Communication is locked with bit manipulation in order to allow
"lock and hold" for the add operation. In case of an add operation,
if the lock is held, MD_CLUSTER_SEND_LOCKED_ALREADY is set.
When md_update_sb() calls metadata_update_start(), it checks
(in a single statement to avoid races), if the communication
is already locked. If yes, it merely returns zero, else it
locks the token lockresource.

Signed-off-by: Goldwyn Rodrigues <[email protected]>
Signed-off-by: NeilBrown <[email protected]>
  • Loading branch information
GuoqingJiang authored and NeilBrown committed Jan 6, 2016
1 parent 15858fa commit 8b9277c
Showing 1 changed file with 63 additions and 10 deletions.
73 changes: 63 additions & 10 deletions drivers/md/md-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,26 @@ struct resync_info {
#define MD_CLUSTER_SUSPEND_READ_BALANCING 2
#define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3

/* Lock the send communication. This is done through
* bit manipulation as opposed to a mutex in order to
* accomodate lock and hold. See next comment.
*/
#define MD_CLUSTER_SEND_LOCK 4
/* If cluster operations must lock the communication channel,
* so as to perform extra operations (and no other operation
* is allowed on the MD, such as adding a disk. Token needs
* to be locked and held until the operation completes with
* a md_update_sb(), which would eventually release the lock.
*/
#define MD_CLUSTER_SEND_LOCKED_ALREADY 5


struct md_cluster_info {
/* dlm lock space and resources for clustered raid. */
dlm_lockspace_t *lockspace;
int slot_number;
struct completion completion;
struct mutex recv_mutex;
struct dlm_lock_resource *bitmap_lockres;
struct dlm_lock_resource **other_bitmap_lockres;
struct dlm_lock_resource *resync_lockres;
Expand All @@ -68,6 +82,7 @@ struct md_cluster_info {
struct dlm_lock_resource *no_new_dev_lockres;
struct md_thread *recv_thread;
struct completion newdisk_completion;
wait_queue_head_t wait;
unsigned long state;
};

Expand Down Expand Up @@ -508,9 +523,11 @@ static void recv_daemon(struct md_thread *thread)
struct cluster_msg msg;
int ret;

mutex_lock(&cinfo->recv_mutex);
/*get CR on Message*/
if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
pr_err("md/raid1:failed to get CR on MESSAGE\n");
mutex_unlock(&cinfo->recv_mutex);
return;
}

Expand All @@ -534,33 +551,45 @@ static void recv_daemon(struct md_thread *thread)
ret = dlm_unlock_sync(message_lockres);
if (unlikely(ret != 0))
pr_info("unlock msg failed return %d\n", ret);
mutex_unlock(&cinfo->recv_mutex);
}

/* lock_comm()
/* lock_token()
* Takes the lock on the TOKEN lock resource so no other
* node can communicate while the operation is underway.
* If called again, and the TOKEN lock is alread in EX mode
* return success. However, care must be taken that unlock_comm()
* is called only once.
*/
static int lock_comm(struct md_cluster_info *cinfo)
static int lock_token(struct md_cluster_info *cinfo)
{
int error;

if (cinfo->token_lockres->mode == DLM_LOCK_EX)
return 0;

error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
if (error)
pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
__func__, __LINE__, error);

/* Lock the receive sequence */
mutex_lock(&cinfo->recv_mutex);
return error;
}

/* lock_comm()
* Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
*/
static int lock_comm(struct md_cluster_info *cinfo)
{
wait_event(cinfo->wait,
!test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));

return lock_token(cinfo);
}

static void unlock_comm(struct md_cluster_info *cinfo)
{
WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
mutex_unlock(&cinfo->recv_mutex);
dlm_unlock_sync(cinfo->token_lockres);
clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
wake_up(&cinfo->wait);
}

/* __sendmsg()
Expand Down Expand Up @@ -713,6 +742,8 @@ static int join(struct mddev *mddev, int nodes)
spin_lock_init(&cinfo->suspend_lock);
init_completion(&cinfo->completion);
set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
init_waitqueue_head(&cinfo->wait);
mutex_init(&cinfo->recv_mutex);

mddev->cluster_info = cinfo;

Expand Down Expand Up @@ -843,9 +874,25 @@ static int slot_number(struct mddev *mddev)
return cinfo->slot_number - 1;
}

/*
* Check if the communication is already locked, else lock the communication
* channel.
* If it is already locked, token is in EX mode, and hence lock_token()
* should not be called.
*/
static int metadata_update_start(struct mddev *mddev)
{
return lock_comm(mddev->cluster_info);
struct md_cluster_info *cinfo = mddev->cluster_info;

wait_event(cinfo->wait,
!test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));

/* If token is already locked, return 0 */
if (cinfo->token_lockres->mode == DLM_LOCK_EX)
return 0;

return lock_token(cinfo);
}

static int metadata_update_finish(struct mddev *mddev)
Expand All @@ -870,13 +917,15 @@ static int metadata_update_finish(struct mddev *mddev)
ret = __sendmsg(cinfo, &cmsg);
} else
pr_warn("md-cluster: No good device id found to send\n");
clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
unlock_comm(cinfo);
return ret;
}

static void metadata_update_cancel(struct mddev *mddev)
{
struct md_cluster_info *cinfo = mddev->cluster_info;
clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
unlock_comm(cinfo);
}

Expand Down Expand Up @@ -970,14 +1019,18 @@ static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
ret = -ENOENT;
if (ret)
unlock_comm(cinfo);
else
else {
dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
wake_up(&cinfo->wait);
}
return ret;
}

static void add_new_disk_cancel(struct mddev *mddev)
{
struct md_cluster_info *cinfo = mddev->cluster_info;
clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
unlock_comm(cinfo);
}

Expand Down

0 comments on commit 8b9277c

Please sign in to comment.