ocfs2/dlm: Refactor dlm_clean_master_list()

This patch refactors dlm_clean_master_list() so as to make it
easier to convert the mle list to a hash.

Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
This commit is contained in:
Sunil Mushran 2009-02-26 15:00:39 -08:00 committed by Mark Fasheh
parent f77a9a78c3
commit c2cd4a4433

View file

@ -3207,12 +3207,87 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
return ret;
}
/*
* Sets the owner of the lockres, associated to the mle, to UNKNOWN
*/
static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle)
{
struct dlm_lock_resource *res;
unsigned int hash;
/* Find the lockres associated to the mle and set its owner to UNK */
hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len);
res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, mle->u.mlename.len,
hash);
if (res) {
spin_unlock(&dlm->master_lock);
/* move lockres onto recovery list */
spin_lock(&res->spinlock);
dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
dlm_move_lockres_to_recovery_list(dlm, res);
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
/* about to get rid of mle, detach from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
/* dump the mle */
spin_lock(&dlm->master_lock);
__dlm_put_mle(mle);
spin_unlock(&dlm->master_lock);
}
return res;
}
static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle)
{
__dlm_mle_detach_hb_events(dlm, mle);
spin_lock(&mle->spinlock);
__dlm_unlink_mle(dlm, mle);
atomic_set(&mle->woken, 1);
spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
}
static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle, u8 dead_node)
{
int bit;
BUG_ON(mle->type != DLM_MLE_BLOCK);
spin_lock(&mle->spinlock);
bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
if (bit != dead_node) {
mlog(0, "mle found, but dead node %u would not have been "
"master\n", dead_node);
spin_unlock(&mle->spinlock);
} else {
/* Must drop the refcount by one since the assert_master will
* never arrive. This may result in the mle being unlinked and
* freed, but there may still be a process waiting in the
* dlmlock path which is fine. */
mlog(0, "node %u was expected master\n", dead_node);
atomic_set(&mle->woken, 1);
spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
/* Do not need events any longer, so detach from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
__dlm_put_mle(mle);
}
}
void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
{
struct dlm_master_list_entry *mle, *next;
struct dlm_lock_resource *res;
unsigned int hash;
mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
top:
@ -3236,30 +3311,7 @@ top:
* need to clean up if the dead node would have
* been the master. */
if (mle->type == DLM_MLE_BLOCK) {
int bit;
spin_lock(&mle->spinlock);
bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
if (bit != dead_node) {
mlog(0, "mle found, but dead node %u would "
"not have been master\n", dead_node);
spin_unlock(&mle->spinlock);
} else {
/* must drop the refcount by one since the
* assert_master will never arrive. this
* may result in the mle being unlinked and
* freed, but there may still be a process
* waiting in the dlmlock path which is fine. */
mlog(0, "node %u was expected master\n",
dead_node);
atomic_set(&mle->woken, 1);
spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
/* do not need events any longer, so detach
* from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
__dlm_put_mle(mle);
}
dlm_clean_block_mle(dlm, mle, dead_node);
continue;
}
@ -3280,51 +3332,21 @@ top:
/* if we have reached this point, this mle needs to
* be removed from the list and freed. */
/* remove from the list early. NOTE: unlinking
* list_head while in list_for_each_safe */
__dlm_mle_detach_hb_events(dlm, mle);
spin_lock(&mle->spinlock);
__dlm_unlink_mle(dlm, mle);
atomic_set(&mle->woken, 1);
spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
dlm_clean_migration_mle(dlm, mle);
mlog(0, "%s: node %u died during migration from "
"%u to %u!\n", dlm->name, dead_node,
mle->master, mle->new_master);
/* if there is a lockres associated with this
* mle, find it and set its owner to UNKNOWN */
hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len);
res = __dlm_lookup_lockres(dlm, mle->u.mlename.name,
mle->u.mlename.len, hash);
if (res) {
/* unfortunately if we hit this rare case, our
* lock ordering is messed. we need to drop
* the master lock so that we can take the
* lockres lock, meaning that we will have to
* restart from the head of list. */
spin_unlock(&dlm->master_lock);
/* move lockres onto recovery list */
spin_lock(&res->spinlock);
dlm_set_lockres_owner(dlm, res,
DLM_LOCK_RES_OWNER_UNKNOWN);
dlm_move_lockres_to_recovery_list(dlm, res);
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
/* about to get rid of mle, detach from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
/* dump the mle */
spin_lock(&dlm->master_lock);
__dlm_put_mle(mle);
spin_unlock(&dlm->master_lock);
/* If we find a lockres associated with the mle, we've
* hit this rare case that messes up our lock ordering.
* If so, we need to drop the master lock so that we can
* take the lockres lock, meaning that we will have to
* restart from the head of list. */
res = dlm_reset_mleres_owner(dlm, mle);
if (res)
/* restart */
goto top;
}
/* this may be the last reference */
__dlm_put_mle(mle);