[DLM] dlm: user locks

This changes the way the dlm handles user locks.  The core dlm is now
aware of user locks so they can be dealt with more efficiently.  There is
no more dlm_device module which previously managed its own duplicate copy
of every user lock.

Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig
index 09e78bf..490f85b 100644
--- a/fs/dlm/Kconfig
+++ b/fs/dlm/Kconfig
@@ -10,14 +10,6 @@
 	A general purpose distributed lock manager for kernel or userspace
 	applications.
 
-config DLM_DEVICE
-	tristate "DLM device for userspace access"
-	depends on DLM
-	help
-	This module creates a misc device through which the dlm lockspace
-	and locking functions become available to userspace applications
-	(usually through the libdlm library).
-
 config DLM_DEBUG
 	bool "DLM debugging"
 	depends on DLM
diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile
index 1e6232e..1832e02 100644
--- a/fs/dlm/Makefile
+++ b/fs/dlm/Makefile
@@ -1,6 +1,4 @@
 obj-$(CONFIG_DLM) +=		dlm.o
-obj-$(CONFIG_DLM_DEVICE) +=	dlm_device.o
-
 dlm-y :=			ast.o \
 				config.o \
 				dir.o \
@@ -15,7 +13,7 @@
 				recover.o \
 				recoverd.o \
 				requestqueue.o \
+				user.o \
 				util.o
 dlm-$(CONFIG_DLM_DEBUG) +=	debug_fs.o
 
-dlm_device-y :=			device.o
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 57bdf09..a211330 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -13,7 +13,7 @@
 
 #include "dlm_internal.h"
 #include "lock.h"
-#include "ast.h"
+#include "user.h"
 
 #define WAKE_ASTS  0
 
@@ -34,6 +34,11 @@
 
 void dlm_add_ast(struct dlm_lkb *lkb, int type)
 {
+	if (lkb->lkb_flags & DLM_IFL_USER) {
+		dlm_user_add_ast(lkb, type);
+		return;
+	}
+
 	spin_lock(&ast_queue_lock);
 	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
 		kref_get(&lkb->lkb_ref);
diff --git a/fs/dlm/device.c b/fs/dlm/device.c
deleted file mode 100644
index 825bbc0..0000000
--- a/fs/dlm/device.c
+++ /dev/null
@@ -1,1239 +0,0 @@
-/******************************************************************************
-*******************************************************************************
-**
-**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
-**
-**  This copyrighted material is made available to anyone wishing to use,
-**  modify, copy, or redistribute it subject to the terms and conditions
-**  of the GNU General Public License v.2.
-**
-*******************************************************************************
-******************************************************************************/
-
-/*
- * device.c
- *
- * This is the userland interface to the DLM.
- *
- * The locking is done via a misc char device (find the
- * registered minor number in /proc/misc).
- *
- * User code should not use this interface directly but
- * call the library routines in libdlm.a instead.
- *
- */
-
-#include <linux/miscdevice.h>
-#include <linux/init.h>
-#include <linux/wait.h>
-#include <linux/module.h>
-#include <linux/file.h>
-#include <linux/fs.h>
-#include <linux/poll.h>
-#include <linux/signal.h>
-#include <linux/spinlock.h>
-#include <linux/idr.h>
-
-#include <linux/dlm.h>
-#include <linux/dlm_device.h>
-
-#include "lvb_table.h"
-
-static struct file_operations _dlm_fops;
-static const char *name_prefix="dlm";
-static struct list_head user_ls_list;
-static struct mutex user_ls_lock;
-
-/* Flags in li_flags */
-#define LI_FLAG_COMPLETE   1
-#define LI_FLAG_FIRSTLOCK  2
-#define LI_FLAG_PERSISTENT 3
-#define LI_FLAG_ONLIST     4
-
-/* flags in ls_flags*/
-#define LS_FLAG_DELETED   1
-#define LS_FLAG_AUTOFREE  2
-
-/* flags in ls_flags*/
-#define FI_FLAG_OPEN      1
-#define FI_FLAG_COMPAT    2
-
-#define LOCKINFO_MAGIC 0x53595324
-
-struct lock_info {
-	uint32_t li_magic;
-	uint8_t li_cmd;
-	int8_t	li_grmode;
-	int8_t  li_rqmode;
-	struct dlm_lksb li_lksb;
-	wait_queue_head_t li_waitq;
-	unsigned long li_flags;
-	void __user *li_castparam;
-	void __user *li_castaddr;
-	void __user *li_bastparam;
-	void __user *li_bastaddr;
-	void __user *li_pend_bastparam;
-	void __user *li_pend_bastaddr;
-	struct list_head li_ownerqueue;
-	struct file_info *li_file;
-	struct dlm_lksb __user *li_user_lksb;
-	struct completion li_firstcomp;
-};
-
-/* A queued AST no less */
-struct ast_info {
-	struct dlm_lock_result result;
-	struct list_head list;
-	uint32_t lvb_updated;
-	uint32_t progress;      /* How much has been read */
-};
-
-/* One of these per userland lockspace */
-struct user_ls {
-	void    *ls_lockspace;
-	atomic_t ls_refcnt;
-	long     ls_flags;
-
-	/* Lock infos are stored in here indexed by lock ID */
-	struct idr lockinfo_idr;
-	rwlock_t lockinfo_lock;
-
-	/* Passed into misc_register() */
-	struct miscdevice ls_miscinfo;
-	struct list_head  ls_list;
-};
-
-/* misc_device info for the control device */
-static struct miscdevice ctl_device;
-
-/*
- * Stuff we hang off the file struct.
- * The first two are to cope with unlocking all the
- * locks help by a process when it dies.
- */
-struct file_info {
-	struct list_head    fi_li_list;  /* List of active lock_infos */
-	spinlock_t          fi_li_lock;
-	struct list_head    fi_ast_list; /* Queue of ASTs to be delivered */
-	spinlock_t          fi_ast_lock;
-	wait_queue_head_t   fi_wait;
-	struct user_ls     *fi_ls;
-	atomic_t            fi_refcnt;   /* Number of users */
-	unsigned long       fi_flags;
-};
-
-#ifdef CONFIG_COMPAT
-
-struct dlm_lock_params32 {
-	__u8 mode;
-	__u8 namelen;
-	__u16 flags;
-	__u32 lkid;
-	__u32 parent;
-
-	__u32 castparam;
-	__u32 castaddr;
-	__u32 bastparam;
-	__u32 bastaddr;
-	__u32 lksb;
-
-	char lvb[DLM_USER_LVB_LEN];
-	char name[0];
-};
-
-struct dlm_write_request32 {
-	__u32 version[3];
-	__u8 cmd;
-	__u8 is64bit;
-	__u8 unused[2];
-
-	union  {
-		struct dlm_lock_params32 lock;
-		struct dlm_lspace_params lspace;
-	} i;
-};
-
-struct dlm_lksb32 {
-	__u32 	 sb_status;
-	__u32    sb_lkid;
-	__u8 	 sb_flags;
-	__u32    sb_lvbptr;
-};
-
-struct dlm_lock_result32 {
-	__u32 length;
-	__u32 user_astaddr;
-	__u32 user_astparam;
-	__u32 user_lksb;
-	struct dlm_lksb32 lksb;
-	__u8 bast_mode;
-	__u8 unused[3];
-	/* Offsets may be zero if no data is present */
-	__u32 lvb_offset;
-};
-
-
-static void compat_input(struct dlm_write_request *kparams, struct dlm_write_request32 *k32params)
-{
-
-	kparams->version[0] = k32params->version[0];
-	kparams->version[1] = k32params->version[1];
-	kparams->version[2] = k32params->version[2];
-
-	kparams->cmd = k32params->cmd;
-	kparams->is64bit = k32params->is64bit;
-	if (kparams->cmd == DLM_USER_CREATE_LOCKSPACE ||
-	    kparams->cmd == DLM_USER_REMOVE_LOCKSPACE) {
-
-		kparams->i.lspace.flags = k32params->i.lspace.flags;
-		kparams->i.lspace.minor = k32params->i.lspace.minor;
-		strcpy(kparams->i.lspace.name, k32params->i.lspace.name);
-	}
-	else {
-		kparams->i.lock.mode = k32params->i.lock.mode;
-		kparams->i.lock.namelen = k32params->i.lock.namelen;
-		kparams->i.lock.flags = k32params->i.lock.flags;
-		kparams->i.lock.lkid = k32params->i.lock.lkid;
-		kparams->i.lock.parent = k32params->i.lock.parent;
-		kparams->i.lock.castparam = (void *)(long)k32params->i.lock.castparam;
-		kparams->i.lock.castaddr = (void *)(long)k32params->i.lock.castaddr;
-		kparams->i.lock.bastparam = (void *)(long)k32params->i.lock.bastparam;
-		kparams->i.lock.bastaddr = (void *)(long)k32params->i.lock.bastaddr;
-		kparams->i.lock.lksb = (void *)(long)k32params->i.lock.lksb;
-		memcpy(kparams->i.lock.lvb, k32params->i.lock.lvb, DLM_USER_LVB_LEN);
-		memcpy(kparams->i.lock.name, k32params->i.lock.name, kparams->i.lock.namelen);
-	}
-}
-
-void compat_output(struct dlm_lock_result *res, struct dlm_lock_result32 *res32)
-{
-	res32->length = res->length - (sizeof(struct dlm_lock_result) - sizeof(struct dlm_lock_result32));
-	res32->user_astaddr = (__u32)(long)res->user_astaddr;
-	res32->user_astparam = (__u32)(long)res->user_astparam;
-	res32->user_lksb = (__u32)(long)res->user_lksb;
-	res32->bast_mode = res->bast_mode;
-
-	res32->lvb_offset = res->lvb_offset;
-	res32->length = res->length;
-
-	res32->lksb.sb_status = res->lksb.sb_status;
-	res32->lksb.sb_flags = res->lksb.sb_flags;
-	res32->lksb.sb_lkid = res->lksb.sb_lkid;
-	res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
-}
-#endif
-
-
-/* get and put ops for file_info.
-   Actually I don't really like "get" and "put", but everyone
-   else seems to use them and I can't think of anything
-   nicer at the moment */
-static void get_file_info(struct file_info *f)
-{
-	atomic_inc(&f->fi_refcnt);
-}
-
-static void put_file_info(struct file_info *f)
-{
-	if (atomic_dec_and_test(&f->fi_refcnt))
-		kfree(f);
-}
-
-static void release_lockinfo(struct user_ls *ls, struct lock_info *li)
-{
-	put_file_info(li->li_file);
-
-	write_lock(&ls->lockinfo_lock);
-	idr_remove(&ls->lockinfo_idr, li->li_lksb.sb_lkid);
-	write_unlock(&ls->lockinfo_lock);
-
-	if (li->li_lksb.sb_lvbptr)
-		kfree(li->li_lksb.sb_lvbptr);
-	kfree(li);
-
-	module_put(THIS_MODULE);
-}
-
-static struct lock_info *get_lockinfo(struct user_ls *ls, uint32_t lockid)
-{
-	struct lock_info *li;
-
-	read_lock(&ls->lockinfo_lock);
-	li = idr_find(&ls->lockinfo_idr, lockid);
-	read_unlock(&ls->lockinfo_lock);
-
-	return li;
-}
-
-static int add_lockinfo(struct user_ls *ls, struct lock_info *li)
-{
-	int n;
-	int r;
-	int ret = -EINVAL;
-
-	write_lock(&ls->lockinfo_lock);
-
-	if (idr_find(&ls->lockinfo_idr, li->li_lksb.sb_lkid))
-		goto out_up;
-
-	ret = -ENOMEM;
-	r = idr_pre_get(&ls->lockinfo_idr, GFP_KERNEL);
-	if (!r)
-		goto out_up;
-
-	r = idr_get_new_above(&ls->lockinfo_idr, li, li->li_lksb.sb_lkid, &n);
-	if (r)
-		goto out_up;
-
-	if (n != li->li_lksb.sb_lkid) {
-		idr_remove(&ls->lockinfo_idr, n);
-		goto out_up;
-	}
-
-	ret = 0;
-
- out_up:
-	write_unlock(&ls->lockinfo_lock);
-
-	return ret;
-}
-
-
-static struct user_ls *__find_lockspace(int minor)
-{
-	struct user_ls *lsinfo;
-
-	list_for_each_entry(lsinfo, &user_ls_list, ls_list) {
-		if (lsinfo->ls_miscinfo.minor == minor)
-			return lsinfo;
-	}
-	return NULL;
-}
-
-/* Find a lockspace struct given the device minor number */
-static struct user_ls *find_lockspace(int minor)
-{
-	struct user_ls *lsinfo;
-
-	mutex_lock(&user_ls_lock);
-	lsinfo = __find_lockspace(minor);
-	mutex_unlock(&user_ls_lock);
-
-	return lsinfo;
-}
-
-static void add_lockspace_to_list(struct user_ls *lsinfo)
-{
-	mutex_lock(&user_ls_lock);
-	list_add(&lsinfo->ls_list, &user_ls_list);
-	mutex_unlock(&user_ls_lock);
-}
-
-/* Register a lockspace with the DLM and create a misc
-   device for userland to access it */
-static int register_lockspace(char *name, struct user_ls **ls, int flags)
-{
-	struct user_ls *newls;
-	int status;
-	int namelen;
-
-	namelen = strlen(name)+strlen(name_prefix)+2;
-
-	newls = kzalloc(sizeof(struct user_ls), GFP_KERNEL);
-	if (!newls)
-		return -ENOMEM;
-
-	newls->ls_miscinfo.name = kzalloc(namelen, GFP_KERNEL);
-	if (!newls->ls_miscinfo.name) {
-		kfree(newls);
-		return -ENOMEM;
-	}
-
-	status = dlm_new_lockspace(name, strlen(name), &newls->ls_lockspace, 0,
-				   DLM_USER_LVB_LEN);
-	if (status != 0) {
-		kfree(newls->ls_miscinfo.name);
-		kfree(newls);
-		return status;
-	}
-
-	idr_init(&newls->lockinfo_idr);
-	rwlock_init(&newls->lockinfo_lock);
-
-	snprintf((char*)newls->ls_miscinfo.name, namelen, "%s_%s",
-		 name_prefix, name);
-
-	newls->ls_miscinfo.fops = &_dlm_fops;
-	newls->ls_miscinfo.minor = MISC_DYNAMIC_MINOR;
-
-	status = misc_register(&newls->ls_miscinfo);
-	if (status) {
-		printk(KERN_ERR "dlm: misc register failed for %s\n", name);
-		dlm_release_lockspace(newls->ls_lockspace, 0);
-		kfree(newls->ls_miscinfo.name);
-		kfree(newls);
-		return status;
-	}
-
-	if (flags & DLM_USER_LSFLG_AUTOFREE)
-		set_bit(LS_FLAG_AUTOFREE, &newls->ls_flags);
-
-	add_lockspace_to_list(newls);
-	*ls = newls;
-	return 0;
-}
-
-/* Called with the user_ls_lock mutex held */
-static int unregister_lockspace(struct user_ls *lsinfo, int force)
-{
-	int status;
-
-	status = dlm_release_lockspace(lsinfo->ls_lockspace, force);
-	if (status)
-		return status;
-
-	status = misc_deregister(&lsinfo->ls_miscinfo);
-	if (status)
-		return status;
-
-	list_del(&lsinfo->ls_list);
-	set_bit(LS_FLAG_DELETED, &lsinfo->ls_flags);
-	lsinfo->ls_lockspace = NULL;
-	if (atomic_read(&lsinfo->ls_refcnt) == 0) {
-		kfree(lsinfo->ls_miscinfo.name);
-		kfree(lsinfo);
-	}
-
-	return 0;
-}
-
-/* Add it to userland's AST queue */
-static void add_to_astqueue(struct lock_info *li, void *astaddr, void *astparam,
-			    int lvb_updated)
-{
-	struct ast_info *ast = kzalloc(sizeof(struct ast_info), GFP_KERNEL);
-	if (!ast)
-		return;
-
-	ast->result.user_astparam = astparam;
-	ast->result.user_astaddr  = astaddr;
-	ast->result.user_lksb     = li->li_user_lksb;
-	memcpy(&ast->result.lksb, &li->li_lksb, sizeof(struct dlm_lksb));
-	ast->lvb_updated = lvb_updated;
-
-	spin_lock(&li->li_file->fi_ast_lock);
-	list_add_tail(&ast->list, &li->li_file->fi_ast_list);
-	spin_unlock(&li->li_file->fi_ast_lock);
-	wake_up_interruptible(&li->li_file->fi_wait);
-}
-
-static void bast_routine(void *param, int mode)
-{
-	struct lock_info *li = param;
-
-	if (li && li->li_bastaddr)
-		add_to_astqueue(li, li->li_bastaddr, li->li_bastparam, 0);
-}
-
-/*
- * This is the kernel's AST routine.
- * All lock, unlock & query operations complete here.
- * The only syncronous ops are those done during device close.
- */
-static void ast_routine(void *param)
-{
-	struct lock_info *li = param;
-
-	/* Param may be NULL if a persistent lock is unlocked by someone else */
-	if (!li)
-		return;
-
-	/* If this is a succesful conversion then activate the blocking ast
-	 * args from the conversion request */
-	if (!test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags) &&
-	    li->li_lksb.sb_status == 0) {
-
-		li->li_bastparam = li->li_pend_bastparam;
-		li->li_bastaddr = li->li_pend_bastaddr;
-		li->li_pend_bastaddr = NULL;
-	}
-
-	/* If it's an async request then post data to the user's AST queue. */
-	if (li->li_castaddr) {
-		int lvb_updated = 0;
-
-		/* See if the lvb has been updated */
-		if (dlm_lvb_operations[li->li_grmode+1][li->li_rqmode+1] == 1)
-			lvb_updated = 1;
-
-		if (li->li_lksb.sb_status == 0)
-			li->li_grmode = li->li_rqmode;
-
-		/* Only queue AST if the device is still open */
-		if (test_bit(FI_FLAG_OPEN, &li->li_file->fi_flags))
-			add_to_astqueue(li, li->li_castaddr, li->li_castparam,
-					lvb_updated);
-
-		/* If it's a new lock operation that failed, then
-		 * remove it from the owner queue and free the
-		 * lock_info.
-		 */
-		if (test_and_clear_bit(LI_FLAG_FIRSTLOCK, &li->li_flags) &&
-		    li->li_lksb.sb_status != 0) {
-
-			/* Wait till dlm_lock() has finished */
-			wait_for_completion(&li->li_firstcomp);
-
-			spin_lock(&li->li_file->fi_li_lock);
-			list_del(&li->li_ownerqueue);
-			clear_bit(LI_FLAG_ONLIST, &li->li_flags);
-			spin_unlock(&li->li_file->fi_li_lock);
-			release_lockinfo(li->li_file->fi_ls, li);
-			return;
-		}
-		/* Free unlocks & queries */
-		if (li->li_lksb.sb_status == -DLM_EUNLOCK ||
-		    li->li_cmd == DLM_USER_QUERY) {
-			release_lockinfo(li->li_file->fi_ls, li);
-		}
-	} else {
-		/* Synchronous request, just wake up the caller */
-		set_bit(LI_FLAG_COMPLETE, &li->li_flags);
-		wake_up_interruptible(&li->li_waitq);
-	}
-}
-
-/*
- * Wait for the lock op to complete and return the status.
- */
-static int wait_for_ast(struct lock_info *li)
-{
-	/* Wait for the AST routine to complete */
-	set_task_state(current, TASK_INTERRUPTIBLE);
-	while (!test_bit(LI_FLAG_COMPLETE, &li->li_flags))
-		schedule();
-
-	set_task_state(current, TASK_RUNNING);
-
-	return li->li_lksb.sb_status;
-}
-
-
-/* Open on control device */
-static int dlm_ctl_open(struct inode *inode, struct file *file)
-{
-	file->private_data = NULL;
-	return 0;
-}
-
-/* Close on control device */
-static int dlm_ctl_close(struct inode *inode, struct file *file)
-{
-	return 0;
-}
-
-/* Open on lockspace device */
-static int dlm_open(struct inode *inode, struct file *file)
-{
-	struct file_info *f;
-	struct user_ls *lsinfo;
-
-	lsinfo = find_lockspace(iminor(inode));
-	if (!lsinfo)
-		return -ENOENT;
-
-	f = kzalloc(sizeof(struct file_info), GFP_KERNEL);
-	if (!f)
-		return -ENOMEM;
-
-	atomic_inc(&lsinfo->ls_refcnt);
-	INIT_LIST_HEAD(&f->fi_li_list);
-	INIT_LIST_HEAD(&f->fi_ast_list);
-	spin_lock_init(&f->fi_li_lock);
-	spin_lock_init(&f->fi_ast_lock);
-	init_waitqueue_head(&f->fi_wait);
-	f->fi_ls = lsinfo;
-	f->fi_flags = 0;
-	get_file_info(f);
-	set_bit(FI_FLAG_OPEN, &f->fi_flags);
-
-	file->private_data = f;
-
-	return 0;
-}
-
-/* Check the user's version matches ours */
-static int check_version(struct dlm_write_request *req)
-{
-	if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
-	    (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
-	     req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
-
-		printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
-		       "user (%d.%d.%d) kernel (%d.%d.%d)\n",
-		       current->comm,
-		       current->pid,
-		       req->version[0],
-		       req->version[1],
-		       req->version[2],
-		       DLM_DEVICE_VERSION_MAJOR,
-		       DLM_DEVICE_VERSION_MINOR,
-		       DLM_DEVICE_VERSION_PATCH);
-		return -EINVAL;
-	}
-	return 0;
-}
-
-/* Close on lockspace device */
-static int dlm_close(struct inode *inode, struct file *file)
-{
-	struct file_info *f = file->private_data;
-	struct lock_info li;
-	struct lock_info *old_li, *safe;
-	sigset_t tmpsig;
-	sigset_t allsigs;
-	struct user_ls *lsinfo;
-	DECLARE_WAITQUEUE(wq, current);
-
-	lsinfo = find_lockspace(iminor(inode));
-	if (!lsinfo)
-		return -ENOENT;
-
-	/* Mark this closed so that ASTs will not be delivered any more */
-	clear_bit(FI_FLAG_OPEN, &f->fi_flags);
-
-	/* Block signals while we are doing this */
-	sigfillset(&allsigs);
-	sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
-
-	/* We use our own lock_info struct here, so that any
-	 * outstanding "real" ASTs will be delivered with the
-	 * corresponding "real" params, thus freeing the lock_info
-	 * that belongs the lock. This catches the corner case where
-	 * a lock is BUSY when we try to unlock it here
-	 */
-	memset(&li, 0, sizeof(li));
-	clear_bit(LI_FLAG_COMPLETE, &li.li_flags);
-	init_waitqueue_head(&li.li_waitq);
-	add_wait_queue(&li.li_waitq, &wq);
-
-	/*
-	 * Free any outstanding locks, they are on the
-	 * list in LIFO order so there should be no problems
-	 * about unlocking parents before children.
-	 */
-	list_for_each_entry_safe(old_li, safe, &f->fi_li_list, li_ownerqueue) {
-		int status;
-		int flags = 0;
-
-		/* Don't unlock persistent locks, just mark them orphaned */
-		if (test_bit(LI_FLAG_PERSISTENT, &old_li->li_flags)) {
-			list_del(&old_li->li_ownerqueue);
-
-			/* Update master copy */
-			/* TODO: Check locking core updates the local and
-			   remote ORPHAN flags */
-			li.li_lksb.sb_lkid = old_li->li_lksb.sb_lkid;
-			status = dlm_lock(f->fi_ls->ls_lockspace,
-					  old_li->li_grmode, &li.li_lksb,
-					  DLM_LKF_CONVERT|DLM_LKF_ORPHAN,
-					  NULL, 0, 0, ast_routine, NULL, NULL);
-			if (status != 0)
-				printk("dlm: Error orphaning lock %x: %d\n",
-				       old_li->li_lksb.sb_lkid, status);
-
-			/* But tidy our references in it */
-			release_lockinfo(old_li->li_file->fi_ls, old_li);
-			continue;
-		}
-
-		clear_bit(LI_FLAG_COMPLETE, &li.li_flags);
-
-		flags = DLM_LKF_FORCEUNLOCK;
-		if (old_li->li_grmode >= DLM_LOCK_PW)
-			flags |= DLM_LKF_IVVALBLK;
-
-		status = dlm_unlock(f->fi_ls->ls_lockspace,
-				    old_li->li_lksb.sb_lkid, flags,
-				    &li.li_lksb, &li);
-
-		/* Must wait for it to complete as the next lock could be its
-		 * parent */
-		if (status == 0)
-			wait_for_ast(&li);
-
-		/* Unlock suceeded, free the lock_info struct. */
-		if (status == 0)
-			release_lockinfo(old_li->li_file->fi_ls, old_li);
-	}
-
-	remove_wait_queue(&li.li_waitq, &wq);
-
-	/*
-	 * If this is the last reference to the lockspace
-	 * then free the struct. If it's an AUTOFREE lockspace
-	 * then free the whole thing.
-	 */
-	mutex_lock(&user_ls_lock);
-	if (atomic_dec_and_test(&lsinfo->ls_refcnt)) {
-
-		if (lsinfo->ls_lockspace) {
-			if (test_bit(LS_FLAG_AUTOFREE, &lsinfo->ls_flags)) {
-				unregister_lockspace(lsinfo, 1);
-			}
-		} else {
-			kfree(lsinfo->ls_miscinfo.name);
-			kfree(lsinfo);
-		}
-	}
-	mutex_unlock(&user_ls_lock);
-	put_file_info(f);
-
-	/* Restore signals */
-	sigprocmask(SIG_SETMASK, &tmpsig, NULL);
-	recalc_sigpending();
-
-	return 0;
-}
-
-static int do_user_create_lockspace(struct file_info *fi, uint8_t cmd,
-				    struct dlm_lspace_params *kparams)
-{
-	int status;
-	struct user_ls *lsinfo;
-
-	if (!capable(CAP_SYS_ADMIN))
-		return -EPERM;
-
-	status = register_lockspace(kparams->name, &lsinfo, kparams->flags);
-
-	/* If it succeeded then return the minor number */
-	if (status == 0)
-		status = lsinfo->ls_miscinfo.minor;
-
-	return status;
-}
-
-static int do_user_remove_lockspace(struct file_info *fi, uint8_t cmd,
-				    struct dlm_lspace_params *kparams)
-{
-	int status;
-	int force = 1;
-	struct user_ls *lsinfo;
-
-	if (!capable(CAP_SYS_ADMIN))
-		return -EPERM;
-
-	mutex_lock(&user_ls_lock);
-	lsinfo = __find_lockspace(kparams->minor);
-	if (!lsinfo) {
-		mutex_unlock(&user_ls_lock);
-		return -EINVAL;
-	}
-
-	if (kparams->flags & DLM_USER_LSFLG_FORCEFREE)
-		force = 3;
-
-	status = unregister_lockspace(lsinfo, force);
-	mutex_unlock(&user_ls_lock);
-
-	return status;
-}
-
-/* Read call, might block if no ASTs are waiting.
- * It will only ever return one message at a time, regardless
- * of how many are pending.
- */
-static ssize_t dlm_read(struct file *file, char __user *buffer, size_t count,
-			loff_t *ppos)
-{
-	struct file_info *fi = file->private_data;
-	struct ast_info *ast;
-	void *data;
-	int data_size;
-	int struct_size;
-	int offset;
-	DECLARE_WAITQUEUE(wait, current);
-#ifdef CONFIG_COMPAT
-	struct dlm_lock_result32 result32;
-
-	if (count < sizeof(struct dlm_lock_result32))
-#else
-	if (count < sizeof(struct dlm_lock_result))
-#endif
-		return -EINVAL;
-
-	spin_lock(&fi->fi_ast_lock);
-	if (list_empty(&fi->fi_ast_list)) {
-
-		/* No waiting ASTs.
-		 * Return EOF if the lockspace been deleted.
-		 */
-		if (test_bit(LS_FLAG_DELETED, &fi->fi_ls->ls_flags))
-			return 0;
-
-		if (file->f_flags & O_NONBLOCK) {
-			spin_unlock(&fi->fi_ast_lock);
-			return -EAGAIN;
-		}
-
-		add_wait_queue(&fi->fi_wait, &wait);
-
-	repeat:
-		set_current_state(TASK_INTERRUPTIBLE);
-		if (list_empty(&fi->fi_ast_list) &&
-		    !signal_pending(current)) {
-
-			spin_unlock(&fi->fi_ast_lock);
-			schedule();
-			spin_lock(&fi->fi_ast_lock);
-			goto repeat;
-		}
-
-		current->state = TASK_RUNNING;
-		remove_wait_queue(&fi->fi_wait, &wait);
-
-		if (signal_pending(current)) {
-			spin_unlock(&fi->fi_ast_lock);
-			return -ERESTARTSYS;
-		}
-	}
-
-	ast = list_entry(fi->fi_ast_list.next, struct ast_info, list);
-	list_del(&ast->list);
-	spin_unlock(&fi->fi_ast_lock);
-
-	/* Work out the size of the returned data */
-#ifdef CONFIG_COMPAT
-	if (test_bit(FI_FLAG_COMPAT, &fi->fi_flags)) {
-		data_size = struct_size = sizeof(struct dlm_lock_result32);
-		data = &result32;
-	}
-	else
-#endif
-	{
-		data_size = struct_size = sizeof(struct dlm_lock_result);
-		data = &ast->result;
-	}
-	if (ast->lvb_updated && ast->result.lksb.sb_lvbptr)
-		data_size += DLM_USER_LVB_LEN;
-
-	offset = struct_size;
-
-	/* Room for the extended data ? */
-	if (count >= data_size) {
-
-		if (ast->lvb_updated && ast->result.lksb.sb_lvbptr) {
-			if (copy_to_user(buffer+offset,
-					 ast->result.lksb.sb_lvbptr,
-					 DLM_USER_LVB_LEN))
-				return -EFAULT;
-			ast->result.lvb_offset = offset;
-			offset += DLM_USER_LVB_LEN;
-		}
-	}
-
-	ast->result.length = data_size;
-
-#ifdef CONFIG_COMPAT
-	compat_output(&ast->result, &result32);
-#endif
-
-	/* Copy the header now it has all the offsets in it */
-	if (copy_to_user(buffer, data, struct_size))
-		offset = -EFAULT;
-
-	/* If we only returned a header and there's more to come then put it
-	   back on the list */
-	if (count < data_size) {
-		spin_lock(&fi->fi_ast_lock);
-		list_add(&ast->list, &fi->fi_ast_list);
-		spin_unlock(&fi->fi_ast_lock);
-	} else
-		kfree(ast);
-	return offset;
-}
-
-static unsigned int dlm_poll(struct file *file, poll_table *wait)
-{
-	struct file_info *fi = file->private_data;
-
-	poll_wait(file, &fi->fi_wait, wait);
-
-	spin_lock(&fi->fi_ast_lock);
-	if (!list_empty(&fi->fi_ast_list)) {
-		spin_unlock(&fi->fi_ast_lock);
-		return POLLIN | POLLRDNORM;
-	}
-
-	spin_unlock(&fi->fi_ast_lock);
-	return 0;
-}
-
-static struct lock_info *allocate_lockinfo(struct file_info *fi, uint8_t cmd,
-					   struct dlm_lock_params *kparams)
-{
-	struct lock_info *li;
-
-	if (!try_module_get(THIS_MODULE))
-		return NULL;
-
-	li = kzalloc(sizeof(struct lock_info), GFP_KERNEL);
-	if (li) {
-		li->li_magic     = LOCKINFO_MAGIC;
-		li->li_file      = fi;
-		li->li_cmd       = cmd;
-		li->li_flags     = 0;
-		li->li_grmode    = -1;
-		li->li_rqmode    = -1;
-		li->li_pend_bastparam = NULL;
-		li->li_pend_bastaddr  = NULL;
-		li->li_castaddr   = NULL;
-		li->li_castparam  = NULL;
-		li->li_lksb.sb_lvbptr = NULL;
-		li->li_bastaddr  = kparams->bastaddr;
-		li->li_bastparam = kparams->bastparam;
-
-		get_file_info(fi);
-	}
-	return li;
-}
-
-static int do_user_lock(struct file_info *fi, uint8_t cmd,
-			struct dlm_lock_params *kparams)
-{
-	struct lock_info *li;
-	int status;
-
-	/*
-	 * Validate things that we need to have correct.
-	 */
-	if (!kparams->castaddr)
-		return -EINVAL;
-
-	if (!kparams->lksb)
-		return -EINVAL;
-
-	/* Persistent child locks are not available yet */
-	if ((kparams->flags & DLM_LKF_PERSISTENT) && kparams->parent)
-		return -EINVAL;
-
-        /* For conversions, there should already be a lockinfo struct,
-	   unless we are adopting an orphaned persistent lock */
-	if (kparams->flags & DLM_LKF_CONVERT) {
-
-		li = get_lockinfo(fi->fi_ls, kparams->lkid);
-
-		/* If this is a persistent lock we will have to create a
-		   lockinfo again */
-		if (!li && (kparams->flags & DLM_LKF_PERSISTENT)) {
-			li = allocate_lockinfo(fi, cmd, kparams);
-			if (!li)
-				return -ENOMEM;
-
-			li->li_lksb.sb_lkid = kparams->lkid;
-			li->li_castaddr  = kparams->castaddr;
-			li->li_castparam = kparams->castparam;
-
-			/* OK, this isn't exactly a FIRSTLOCK but it is the
-			   first time we've used this lockinfo, and if things
-			   fail we want rid of it */
-			init_completion(&li->li_firstcomp);
-			set_bit(LI_FLAG_FIRSTLOCK, &li->li_flags);
-			add_lockinfo(fi->fi_ls, li);
-
-			/* TODO: do a query to get the current state ?? */
-		}
-		if (!li)
-			return -EINVAL;
-
-		if (li->li_magic != LOCKINFO_MAGIC)
-			return -EINVAL;
-
-		/* For conversions don't overwrite the current blocking AST
-		   info so that:
-		   a) if a blocking AST fires before the conversion is queued
-		      it runs the current handler
-		   b) if the conversion is cancelled, the original blocking AST
-		      declaration is active
-		   The pend_ info is made active when the conversion
-		   completes.
-		*/
-		li->li_pend_bastaddr  = kparams->bastaddr;
-		li->li_pend_bastparam = kparams->bastparam;
-	} else {
-		li = allocate_lockinfo(fi, cmd, kparams);
-		if (!li)
-			return -ENOMEM;
-
-		/* Allow us to complete our work before
-  		   the AST routine runs. In fact we only need (and use) this
-		   when the initial lock fails */
-		init_completion(&li->li_firstcomp);
-		set_bit(LI_FLAG_FIRSTLOCK, &li->li_flags);
-	}
-
-	li->li_user_lksb = kparams->lksb;
-	li->li_castaddr  = kparams->castaddr;
-	li->li_castparam = kparams->castparam;
-	li->li_lksb.sb_lkid = kparams->lkid;
-	li->li_rqmode    = kparams->mode;
-	if (kparams->flags & DLM_LKF_PERSISTENT)
-		set_bit(LI_FLAG_PERSISTENT, &li->li_flags);
-
-	/* Copy in the value block */
-	if (kparams->flags & DLM_LKF_VALBLK) {
-		if (!li->li_lksb.sb_lvbptr) {
-			li->li_lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN,
-							GFP_KERNEL);
-			if (!li->li_lksb.sb_lvbptr) {
-				status = -ENOMEM;
-				goto out_err;
-			}
-		}
-
-		memcpy(li->li_lksb.sb_lvbptr, kparams->lvb, DLM_USER_LVB_LEN);
-	}
-
-	/* Lock it ... */
-	status = dlm_lock(fi->fi_ls->ls_lockspace,
-			  kparams->mode, &li->li_lksb,
-			  kparams->flags,
-			  kparams->name, kparams->namelen,
-			  kparams->parent,
-			  ast_routine,
-			  li,
-			  (li->li_pend_bastaddr || li->li_bastaddr) ?
-			   bast_routine : NULL);
-	if (status)
-		goto out_err;
-
-	/* If it succeeded (this far) with a new lock then keep track of
-	   it on the file's lockinfo list */
-	if (!status && test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags)) {
-
-		spin_lock(&fi->fi_li_lock);
-		list_add(&li->li_ownerqueue, &fi->fi_li_list);
-		set_bit(LI_FLAG_ONLIST, &li->li_flags);
-		spin_unlock(&fi->fi_li_lock);
-		if (add_lockinfo(fi->fi_ls, li))
-			printk(KERN_WARNING "Add lockinfo failed\n");
-
-		complete(&li->li_firstcomp);
-	}
-
-	/* Return the lockid as the user needs it /now/ */
-	return li->li_lksb.sb_lkid;
-
- out_err:
-	if (test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags))
-		release_lockinfo(fi->fi_ls, li);
-	return status;
-
-}
-
-static int do_user_unlock(struct file_info *fi, uint8_t cmd,
-			  struct dlm_lock_params *kparams)
-{
-	struct lock_info *li;
-	int status;
-	int convert_cancel = 0;
-
-	li = get_lockinfo(fi->fi_ls, kparams->lkid);
-	if (!li) {
-		li = allocate_lockinfo(fi, cmd, kparams);
-		if (!li)
-			return -ENOMEM;
-		spin_lock(&fi->fi_li_lock);
-		list_add(&li->li_ownerqueue, &fi->fi_li_list);
-		set_bit(LI_FLAG_ONLIST, &li->li_flags);
-		spin_unlock(&fi->fi_li_lock);
-	}
-
-	if (li->li_magic != LOCKINFO_MAGIC)
-		return -EINVAL;
-
-	li->li_user_lksb = kparams->lksb;
-	li->li_castparam = kparams->castparam;
-	li->li_cmd       = cmd;
-
-	/* Cancelling a conversion doesn't remove the lock...*/
-	if (kparams->flags & DLM_LKF_CANCEL && li->li_grmode != -1)
-		convert_cancel = 1;
-
-	/* Wait until dlm_lock() has completed */
-	if (!test_bit(LI_FLAG_ONLIST, &li->li_flags)) {
-		wait_for_completion(&li->li_firstcomp);
-	}
-
-	/* dlm_unlock() passes a 0 for castaddr which means don't overwrite
-	   the existing li_castaddr as that's the completion routine for
-	   unlocks. dlm_unlock_wait() specifies a new AST routine to be
-	   executed when the unlock completes. */
-	if (kparams->castaddr)
-		li->li_castaddr = kparams->castaddr;
-
-	/* Use existing lksb & astparams */
-	status = dlm_unlock(fi->fi_ls->ls_lockspace,
-			     kparams->lkid,
-			     kparams->flags, &li->li_lksb, li);
-
-	if (!status && !convert_cancel) {
-		spin_lock(&fi->fi_li_lock);
-		list_del(&li->li_ownerqueue);
-		clear_bit(LI_FLAG_ONLIST, &li->li_flags);
-		spin_unlock(&fi->fi_li_lock);
-	}
-
-	return status;
-}
-
-/* Write call, submit a locking request */
-static ssize_t dlm_write(struct file *file, const char __user *buffer,
-			 size_t count, loff_t *ppos)
-{
-	struct file_info *fi = file->private_data;
-	struct dlm_write_request *kparams;
-	sigset_t tmpsig;
-	sigset_t allsigs;
-	int status;
-
-#ifdef CONFIG_COMPAT
-	if (count < sizeof(struct dlm_write_request32))
-#else
-	if (count < sizeof(struct dlm_write_request))
-#endif
-		return -EINVAL;
-
-	if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
-		return -EINVAL;
-
-	/* Has the lockspace been deleted */
-	if (fi && test_bit(LS_FLAG_DELETED, &fi->fi_ls->ls_flags))
-		return -ENOENT;
-
-	kparams = kmalloc(count, GFP_KERNEL);
-	if (!kparams)
-		return -ENOMEM;
-
-	status = -EFAULT;
-	/* Get the command info */
-	if (copy_from_user(kparams, buffer, count))
-		goto out_free;
-
-	status = -EBADE;
-	if (check_version(kparams))
-		goto out_free;
-
-#ifdef CONFIG_COMPAT
-	if (!kparams->is64bit) {
-		struct dlm_write_request32 *k32params = (struct dlm_write_request32 *)kparams;
-		kparams = kmalloc(count + (sizeof(struct dlm_write_request) - sizeof(struct dlm_write_request32)), GFP_KERNEL);
-		if (!kparams)
-			return -ENOMEM;
-
-		if (fi)
-			set_bit(FI_FLAG_COMPAT, &fi->fi_flags);
-		compat_input(kparams, k32params);
-		kfree(k32params);
-	}
-#endif
-
-	/* Block signals while we are doing this */
-	sigfillset(&allsigs);
-	sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
-
-	status = -EINVAL;
-	switch (kparams->cmd)
-	{
-	case DLM_USER_LOCK:
-		if (!fi) goto out_sig;
-		status = do_user_lock(fi, kparams->cmd, &kparams->i.lock);
-		break;
-
-	case DLM_USER_UNLOCK:
-		if (!fi) goto out_sig;
-		status = do_user_unlock(fi, kparams->cmd, &kparams->i.lock);
-		break;
-
-	case DLM_USER_CREATE_LOCKSPACE:
-		if (fi) goto out_sig;
-		status = do_user_create_lockspace(fi, kparams->cmd,
-						  &kparams->i.lspace);
-		break;
-
-	case DLM_USER_REMOVE_LOCKSPACE:
-		if (fi) goto out_sig;
-		status = do_user_remove_lockspace(fi, kparams->cmd,
-						  &kparams->i.lspace);
-		break;
-	default:
-		printk("Unknown command passed to DLM device : %d\n",
-			kparams->cmd);
-		break;
-	}
-
- out_sig:
-	/* Restore signals */
-	sigprocmask(SIG_SETMASK, &tmpsig, NULL);
-	recalc_sigpending();
-
- out_free:
-	kfree(kparams);
-	if (status == 0)
-		return count;
-	else
-		return status;
-}
-
-static struct file_operations _dlm_fops = {
-      .open    = dlm_open,
-      .release = dlm_close,
-      .read    = dlm_read,
-      .write   = dlm_write,
-      .poll    = dlm_poll,
-      .owner   = THIS_MODULE,
-};
-
-static struct file_operations _dlm_ctl_fops = {
-      .open    = dlm_ctl_open,
-      .release = dlm_ctl_close,
-      .write   = dlm_write,
-      .owner   = THIS_MODULE,
-};
-
-/*
- * Create control device
- */
-static int __init dlm_device_init(void)
-{
-	int r;
-
-	INIT_LIST_HEAD(&user_ls_list);
-	mutex_init(&user_ls_lock);
-
-	ctl_device.name = "dlm-control";
-	ctl_device.fops = &_dlm_ctl_fops;
-	ctl_device.minor = MISC_DYNAMIC_MINOR;
-
-	r = misc_register(&ctl_device);
-	if (r) {
-		printk(KERN_ERR "dlm: misc_register failed for control dev\n");
-		return r;
-	}
-
-	return 0;
-}
-
-static void __exit dlm_device_exit(void)
-{
-	misc_deregister(&ctl_device);
-}
-
-MODULE_DESCRIPTION("Distributed Lock Manager device interface");
-MODULE_AUTHOR("Red Hat, Inc.");
-MODULE_LICENSE("GPL");
-
-module_init(dlm_device_init);
-module_exit(dlm_device_exit);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 149106f..db080de 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -35,6 +35,7 @@
 #include <linux/kref.h>
 #include <linux/kernel.h>
 #include <linux/jhash.h>
+#include <linux/miscdevice.h>
 #include <linux/mutex.h>
 #include <asm/semaphore.h>
 #include <asm/uaccess.h>
@@ -68,6 +69,7 @@
 #define log_error(ls, fmt, args...) \
 	printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args)
 
+#define DLM_LOG_DEBUG
 #ifdef DLM_LOG_DEBUG
 #define log_debug(ls, fmt, args...) log_error(ls, fmt, ##args)
 #else
@@ -204,6 +206,9 @@
 
 #define DLM_IFL_MSTCPY		0x00010000
 #define DLM_IFL_RESEND		0x00020000
+#define DLM_IFL_DEAD		0x00040000
+#define DLM_IFL_USER		0x00000001
+#define DLM_IFL_ORPHAN		0x00000002
 
 struct dlm_lkb {
 	struct dlm_rsb		*lkb_resource;	/* the rsb */
@@ -231,6 +236,7 @@
 	struct list_head	lkb_rsb_lookup;	/* waiting for rsb lookup */
 	struct list_head	lkb_wait_reply;	/* waiting for remote reply */
 	struct list_head	lkb_astqueue;	/* need ast to be sent */
+	struct list_head	lkb_ownqueue;	/* list of locks for a process */
 
 	char			*lkb_lvbptr;
 	struct dlm_lksb		*lkb_lksb;      /* caller's status block */
@@ -409,6 +415,7 @@
 
 struct dlm_ls {
 	struct list_head	ls_list;	/* list of lockspaces */
+	dlm_lockspace_t		*ls_local_handle;
 	uint32_t		ls_global_id;	/* global unique lockspace ID */
 	uint32_t		ls_exflags;
 	int			ls_lvblen;
@@ -444,6 +451,8 @@
 	wait_queue_head_t	ls_uevent_wait;	/* user part of join/leave */
 	int			ls_uevent_result;
 
+	struct miscdevice       ls_device;
+
 	/* recovery related */
 
 	struct timer_list	ls_timer;
@@ -461,6 +470,7 @@
 	spinlock_t		ls_recover_list_lock;
 	int			ls_recover_list_count;
 	wait_queue_head_t	ls_wait_general;
+	struct mutex		ls_clear_proc_locks;
 
 	struct list_head	ls_root_list;	/* root resources */
 	struct rw_semaphore	ls_root_sem;	/* protect root_list */
@@ -475,6 +485,40 @@
 #define LSFL_RCOM_READY		3
 #define LSFL_UEVENT_WAIT	4
 
+/* much of this is just saving user space pointers associated with the
+   lock that we pass back to the user lib with an ast */
+
+struct dlm_user_args {
+	struct dlm_user_proc	*proc; /* each process that opens the lockspace
+					  device has private data
+					  (dlm_user_proc) on the struct file,
+					  the process's locks point back to it*/
+	struct dlm_lksb		lksb;
+	int			old_mode;
+	int			update_user_lvb;
+	struct dlm_lksb __user	*user_lksb;
+	void __user		*castparam;
+	void __user		*castaddr;
+	void __user		*bastparam;
+	void __user		*bastaddr;
+};
+
+#define DLM_PROC_FLAGS_CLOSING 1
+#define DLM_PROC_FLAGS_COMPAT  2
+
+/* locks list is kept so we can remove all a process's locks when it
+   exits (or orphan those that are persistent) */
+
+struct dlm_user_proc {
+	dlm_lockspace_t		*lockspace;
+	unsigned long		flags; /* DLM_PROC_FLAGS */
+	struct list_head	asts;
+	spinlock_t		asts_spin;
+	struct list_head	locks;
+	spinlock_t		locks_spin;
+	wait_queue_head_t	wait;
+};
+
 static inline int dlm_locking_stopped(struct dlm_ls *ls)
 {
 	return !test_bit(LSFL_RUNNING, &ls->ls_flags);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 5f69639..4e222f8 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -55,8 +55,9 @@
                                    R: do_xxxx()
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
-
+#include <linux/types.h>
 #include "dlm_internal.h"
+#include <linux/dlm_device.h>
 #include "memory.h"
 #include "lowcomms.h"
 #include "requestqueue.h"
@@ -69,6 +70,7 @@
 #include "rcom.h"
 #include "recover.h"
 #include "lvb_table.h"
+#include "user.h"
 #include "config.h"
 
 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
@@ -84,6 +86,8 @@
 				    struct dlm_message *ms);
 static int receive_extralen(struct dlm_message *ms);
 
+#define FAKE_USER_AST (void*)0xff00ff00
+
 /*
  * Lock compatibilty matrix - thanks Steve
  * UN = Unlocked state. Not really a state, used as a flag
@@ -152,7 +156,7 @@
         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 };
 
-static void dlm_print_lkb(struct dlm_lkb *lkb)
+void dlm_print_lkb(struct dlm_lkb *lkb)
 {
 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
@@ -291,7 +295,7 @@
 		if (len == r->res_length && !memcmp(name, r->res_name, len))
 			goto found;
 	}
-	return -ENOENT;
+	return -EBADR;
 
  found:
 	if (r->res_nodeid && (flags & R_MASTER))
@@ -376,7 +380,7 @@
 	if (!error)
 		goto out;
 
-	if (error == -ENOENT && !(flags & R_CREATE))
+	if (error == -EBADR && !(flags & R_CREATE))
 		goto out;
 
 	/* the rsb was found but wasn't a master copy */
@@ -920,7 +924,7 @@
 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 		return;
 
-	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
 	if (b == 1) {
 		int len = receive_extralen(ms);
 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
@@ -963,6 +967,8 @@
 	lkb->lkb_rqmode = DLM_LOCK_IV;
 
 	switch (lkb->lkb_status) {
+	case DLM_LKSTS_GRANTED:
+		break;
 	case DLM_LKSTS_CONVERT:
 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
 		break;
@@ -1727,6 +1733,11 @@
 	return -DLM_EUNLOCK;
 }
 
+/* FIXME: if revert_lock() finds that the lkb is granted, we should
+   skip the queue_cast(ECANCEL).  It indicates that the request/convert
+   completed (and queued a normal ast) just before the cancel; we don't
+   want to clobber the sb_result for the normal ast with ECANCEL. */
+   
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
 	revert_lock(r, lkb);
@@ -2739,7 +2750,7 @@
 		confirm_master(r, error);
 		break;
 
-	case -ENOENT:
+	case -EBADR:
 	case -ENOTBLK:
 		/* find_rsb failed to find rsb or rsb wasn't master */
 		r->res_nodeid = -1;
@@ -3545,3 +3556,284 @@
 	return 0;
 }
 
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
+		     int mode, uint32_t flags, void *name, unsigned int namelen,
+		     uint32_t parent_lkid)
+{
+	struct dlm_lkb *lkb;
+	struct dlm_args args;
+	int error;
+
+	lock_recovery(ls);
+
+	error = create_lkb(ls, &lkb);
+	if (error) {
+		kfree(ua);
+		goto out;
+	}
+
+	if (flags & DLM_LKF_VALBLK) {
+		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+		if (!ua->lksb.sb_lvbptr) {
+			kfree(ua);
+			__put_lkb(ls, lkb);
+			error = -ENOMEM;
+			goto out;
+		}
+	}
+
+	/* After ua is attached to lkb it will be freed by free_lkb().
+	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
+	   lock and that lkb_astparam is the dlm_user_args structure. */
+
+	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
+			      FAKE_USER_AST, ua, FAKE_USER_AST, &args);
+	lkb->lkb_flags |= DLM_IFL_USER;
+	ua->old_mode = DLM_LOCK_IV;
+
+	if (error) {
+		__put_lkb(ls, lkb);
+		goto out;
+	}
+
+	error = request_lock(ls, lkb, name, namelen, &args);
+
+	switch (error) {
+	case 0:
+		break;
+	case -EINPROGRESS:
+		error = 0;
+		break;
+	case -EAGAIN:
+		error = 0;
+		/* fall through */
+	default:
+		__put_lkb(ls, lkb);
+		goto out;
+	}
+
+	/* add this new lkb to the per-process list of locks */
+	spin_lock(&ua->proc->locks_spin);
+	kref_get(&lkb->lkb_ref);
+	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+	spin_unlock(&ua->proc->locks_spin);
+ out:
+	unlock_recovery(ls);
+	return error;
+}
+
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+	struct dlm_lkb *lkb;
+	struct dlm_args args;
+	struct dlm_user_args *ua;
+	int error;
+
+	lock_recovery(ls);
+
+	error = find_lkb(ls, lkid, &lkb);
+	if (error)
+		goto out;
+
+	/* user can change the params on its lock when it converts it, or
+	   add an lvb that didn't exist before */
+
+	ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
+		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+		if (!ua->lksb.sb_lvbptr) {
+			error = -ENOMEM;
+			goto out_put;
+		}
+	}
+	if (lvb_in && ua->lksb.sb_lvbptr)
+		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+
+	ua->castparam = ua_tmp->castparam;
+	ua->castaddr = ua_tmp->castaddr;
+	ua->bastparam = ua_tmp->bastparam;
+	ua->bastaddr = ua_tmp->bastaddr;
+	ua->old_mode = lkb->lkb_grmode;
+
+	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
+			      FAKE_USER_AST, &args);
+	if (error)
+		goto out_put;
+
+	error = convert_lock(ls, lkb, &args);
+
+	if (error == -EINPROGRESS || error == -EAGAIN)
+		error = 0;
+ out_put:
+	dlm_put_lkb(lkb);
+ out:
+	unlock_recovery(ls);
+	kfree(ua_tmp);
+	return error;
+}
+
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+		    uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+	struct dlm_lkb *lkb;
+	struct dlm_args args;
+	struct dlm_user_args *ua;
+	int error;
+
+	lock_recovery(ls);
+
+	error = find_lkb(ls, lkid, &lkb);
+	if (error)
+		goto out;
+
+	ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+	if (lvb_in && ua->lksb.sb_lvbptr)
+		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+	ua->castparam = ua_tmp->castparam;
+
+	error = set_unlock_args(flags, ua, &args);
+	if (error)
+		goto out_put;
+
+	error = unlock_lock(ls, lkb, &args);
+
+	if (error == -DLM_EUNLOCK)
+		error = 0;
+	if (error)
+		goto out_put;
+
+	spin_lock(&ua->proc->locks_spin);
+	list_del(&lkb->lkb_ownqueue);
+	spin_unlock(&ua->proc->locks_spin);
+
+	/* this removes the reference for the proc->locks list added by
+	   dlm_user_request */
+	unhold_lkb(lkb);
+ out_put:
+	dlm_put_lkb(lkb);
+ out:
+	unlock_recovery(ls);
+	return error;
+}
+
+int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+		    uint32_t flags, uint32_t lkid)
+{
+	struct dlm_lkb *lkb;
+	struct dlm_args args;
+	struct dlm_user_args *ua;
+	int error;
+
+	lock_recovery(ls);
+
+	error = find_lkb(ls, lkid, &lkb);
+	if (error)
+		goto out;
+
+	ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	ua->castparam = ua_tmp->castparam;
+
+	error = set_unlock_args(flags, ua, &args);
+	if (error)
+		goto out_put;
+
+	error = cancel_lock(ls, lkb, &args);
+
+	if (error == -DLM_ECANCEL)
+		error = 0;
+	if (error)
+		goto out_put;
+
+	/* this lkb was removed from the WAITING queue */
+	if (lkb->lkb_grmode == DLM_LOCK_IV) {
+		spin_lock(&ua->proc->locks_spin);
+		list_del(&lkb->lkb_ownqueue);
+		spin_unlock(&ua->proc->locks_spin);
+		unhold_lkb(lkb);
+	}
+ out_put:
+	dlm_put_lkb(lkb);
+ out:
+	unlock_recovery(ls);
+	return error;
+}
+
+static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+	if (ua->lksb.sb_lvbptr)
+		kfree(ua->lksb.sb_lvbptr);
+	kfree(ua);
+	lkb->lkb_astparam = (long)NULL;
+
+	/* TODO: propogate to master if needed */
+	return 0;
+}
+
+/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
+   Regardless of what rsb queue the lock is on, it's removed and freed. */
+
+static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	struct dlm_args args;
+	int error;
+
+	/* FIXME: we need to handle the case where the lkb is in limbo
+	   while the rsb is being looked up, currently we assert in
+	   _unlock_lock/is_remote because rsb nodeid is -1. */
+
+	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+
+	error = unlock_lock(ls, lkb, &args);
+	if (error == -DLM_EUNLOCK)
+		error = 0;
+	return error;
+}
+
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
+   which we clear here. */
+
+/* proc CLOSING flag is set so no more device_reads should look at proc->asts
+   list, and no more device_writes should add lkb's to proc->locks list; so we
+   shouldn't need to take asts_spin or locks_spin here.  this assumes that
+   device reads/writes/closes are serialized -- FIXME: we may need to serialize
+   them ourself. */
+
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+	struct dlm_lkb *lkb, *safe;
+
+	lock_recovery(ls);
+	mutex_lock(&ls->ls_clear_proc_locks);
+
+	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
+		if (lkb->lkb_ast_type) {
+			list_del(&lkb->lkb_astqueue);
+			unhold_lkb(lkb);
+		}
+
+		list_del(&lkb->lkb_ownqueue);
+
+		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
+			lkb->lkb_flags |= DLM_IFL_ORPHAN;
+			orphan_proc_lock(ls, lkb);
+		} else {
+			lkb->lkb_flags |= DLM_IFL_DEAD;
+			unlock_proc_lock(ls, lkb);
+		}
+
+		/* this removes the reference for the proc->locks list
+		   added by dlm_user_request, it may result in the lkb
+		   being freed */
+
+		dlm_put_lkb(lkb);
+	}
+	mutex_unlock(&ls->ls_clear_proc_locks);
+	unlock_recovery(ls);
+}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 56cdc073..8d2660f 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -14,6 +14,7 @@
 #define __LOCK_DOT_H__
 
 void dlm_print_rsb(struct dlm_rsb *r);
+void dlm_print_lkb(struct dlm_lkb *lkb);
 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
 int dlm_modes_compat(int mode1, int mode2);
 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
@@ -31,6 +32,16 @@
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
 
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
+	uint32_t flags, void *name, unsigned int namelen, uint32_t parent_lkid);
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+	int mode, uint32_t flags, uint32_t lkid, char *lvb_in);
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+	uint32_t flags, uint32_t lkid, char *lvb_in);
+int dlm_user_cancel(struct dlm_ls *ls,  struct dlm_user_args *ua_tmp,
+	uint32_t flags, uint32_t lkid);
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
+
 static inline int is_master(struct dlm_rsb *r)
 {
 	return !r->res_nodeid;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 9ed4b70..3f6cb42 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -270,12 +270,36 @@
 	return ls;
 }
 
-struct dlm_ls *dlm_find_lockspace_local(void *id)
+struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 {
-	struct dlm_ls *ls = id;
+	struct dlm_ls *ls;
 
 	spin_lock(&lslist_lock);
-	ls->ls_count++;
+	list_for_each_entry(ls, &lslist, ls_list) {
+		if (ls->ls_local_handle == lockspace) {
+			ls->ls_count++;
+			goto out;
+		}
+	}
+	ls = NULL;
+ out:
+	spin_unlock(&lslist_lock);
+	return ls;
+}
+
+struct dlm_ls *dlm_find_lockspace_device(int minor)
+{
+	struct dlm_ls *ls;
+
+	spin_lock(&lslist_lock);
+	list_for_each_entry(ls, &lslist, ls_list) {
+		if (ls->ls_device.minor == minor) {
+			ls->ls_count++;
+			goto out;
+		}
+	}
+	ls = NULL;
+ out:
 	spin_unlock(&lslist_lock);
 	return ls;
 }
@@ -436,6 +460,7 @@
 	init_rwsem(&ls->ls_in_recovery);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
 	mutex_init(&ls->ls_requestqueue_mutex);
+	mutex_init(&ls->ls_clear_proc_locks);
 
 	ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
 	if (!ls->ls_recover_buf)
@@ -444,6 +469,7 @@
 	INIT_LIST_HEAD(&ls->ls_recover_list);
 	spin_lock_init(&ls->ls_recover_list_lock);
 	ls->ls_recover_list_count = 0;
+	ls->ls_local_handle = ls;
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_root_list);
 	init_rwsem(&ls->ls_root_sem);
diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h
index 17bd3ba..891eabb 100644
--- a/fs/dlm/lockspace.h
+++ b/fs/dlm/lockspace.h
@@ -18,6 +18,7 @@
 void dlm_lockspace_exit(void);
 struct dlm_ls *dlm_find_lockspace_global(uint32_t id);
 struct dlm_ls *dlm_find_lockspace_local(void *id);
+struct dlm_ls *dlm_find_lockspace_device(int minor);
 void dlm_put_lockspace(struct dlm_ls *ls);
 
 #endif				/* __LOCKSPACE_DOT_H__ */
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 81bf4cb..a8da8dc 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -14,6 +14,7 @@
 #include "dlm_internal.h"
 #include "lockspace.h"
 #include "lock.h"
+#include "user.h"
 #include "memory.h"
 #include "lowcomms.h"
 #include "config.h"
@@ -50,10 +51,16 @@
 	if (error)
 		goto out_debug;
 
+	error = dlm_user_init();
+	if (error)
+		goto out_lowcomms;
+
 	printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
 
 	return 0;
 
+ out_lowcomms:
+	dlm_lowcomms_exit();
  out_debug:
 	dlm_unregister_debugfs();
  out_config:
@@ -68,6 +75,7 @@
 
 static void __exit exit_dlm(void)
 {
+	dlm_user_exit();
 	dlm_lowcomms_exit();
 	dlm_config_exit();
 	dlm_memory_exit();
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index f7cf458..48dfc27 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -84,6 +84,15 @@
 
 void free_lkb(struct dlm_lkb *lkb)
 {
+	if (lkb->lkb_flags & DLM_IFL_USER) {
+		struct dlm_user_args *ua;
+		ua = (struct dlm_user_args *)lkb->lkb_astparam;
+		if (ua) {
+			if (ua->lksb.sb_lvbptr)
+				kfree(ua->lksb.sb_lvbptr);
+			kfree(ua);
+		}
+	}
 	kmem_cache_free(lkb_cache, lkb);
 }
 
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
new file mode 100644
index 0000000..1f05960
--- /dev/null
+++ b/fs/dlm/user.c
@@ -0,0 +1,769 @@
+/*
+ * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU General Public License v.2.
+ */
+
+#include <linux/miscdevice.h>
+#include <linux/init.h>
+#include <linux/wait.h>
+#include <linux/module.h>
+#include <linux/file.h>
+#include <linux/fs.h>
+#include <linux/poll.h>
+#include <linux/signal.h>
+#include <linux/spinlock.h>
+#include <linux/dlm.h>
+#include <linux/dlm_device.h>
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "lock.h"
+#include "lvb_table.h"
+
+static const char *name_prefix="dlm";
+static struct miscdevice ctl_device;
+static struct file_operations device_fops;
+
+#ifdef CONFIG_COMPAT
+
+struct dlm_lock_params32 {
+	__u8 mode;
+	__u8 namelen;
+	__u16 flags;
+	__u32 lkid;
+	__u32 parent;
+
+	__u32 castparam;
+	__u32 castaddr;
+	__u32 bastparam;
+	__u32 bastaddr;
+	__u32 lksb;
+
+	char lvb[DLM_USER_LVB_LEN];
+	char name[0];
+};
+
+struct dlm_write_request32 {
+	__u32 version[3];
+	__u8 cmd;
+	__u8 is64bit;
+	__u8 unused[2];
+
+	union  {
+		struct dlm_lock_params32 lock;
+		struct dlm_lspace_params lspace;
+	} i;
+};
+
+struct dlm_lksb32 {
+	__u32 sb_status;
+	__u32 sb_lkid;
+	__u8 sb_flags;
+	__u32 sb_lvbptr;
+};
+
+struct dlm_lock_result32 {
+	__u32 length;
+	__u32 user_astaddr;
+	__u32 user_astparam;
+	__u32 user_lksb;
+	struct dlm_lksb32 lksb;
+	__u8 bast_mode;
+	__u8 unused[3];
+	/* Offsets may be zero if no data is present */
+	__u32 lvb_offset;
+};
+
+static void compat_input(struct dlm_write_request *kb,
+			 struct dlm_write_request32 *kb32)
+{
+	kb->version[0] = kb32->version[0];
+	kb->version[1] = kb32->version[1];
+	kb->version[2] = kb32->version[2];
+
+	kb->cmd = kb32->cmd;
+	kb->is64bit = kb32->is64bit;
+	if (kb->cmd == DLM_USER_CREATE_LOCKSPACE ||
+	    kb->cmd == DLM_USER_REMOVE_LOCKSPACE) {
+		kb->i.lspace.flags = kb32->i.lspace.flags;
+		kb->i.lspace.minor = kb32->i.lspace.minor;
+		strcpy(kb->i.lspace.name, kb32->i.lspace.name);
+	} else {
+		kb->i.lock.mode = kb32->i.lock.mode;
+		kb->i.lock.namelen = kb32->i.lock.namelen;
+		kb->i.lock.flags = kb32->i.lock.flags;
+		kb->i.lock.lkid = kb32->i.lock.lkid;
+		kb->i.lock.parent = kb32->i.lock.parent;
+		kb->i.lock.castparam = (void *)(long)kb32->i.lock.castparam;
+		kb->i.lock.castaddr = (void *)(long)kb32->i.lock.castaddr;
+		kb->i.lock.bastparam = (void *)(long)kb32->i.lock.bastparam;
+		kb->i.lock.bastaddr = (void *)(long)kb32->i.lock.bastaddr;
+		kb->i.lock.lksb = (void *)(long)kb32->i.lock.lksb;
+		memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN);
+		memcpy(kb->i.lock.name, kb32->i.lock.name, kb->i.lock.namelen);
+	}
+}
+
+static void compat_output(struct dlm_lock_result *res,
+			  struct dlm_lock_result32 *res32)
+{
+	res32->length = res->length - (sizeof(struct dlm_lock_result) -
+				       sizeof(struct dlm_lock_result32));
+	res32->user_astaddr = (__u32)(long)res->user_astaddr;
+	res32->user_astparam = (__u32)(long)res->user_astparam;
+	res32->user_lksb = (__u32)(long)res->user_lksb;
+	res32->bast_mode = res->bast_mode;
+
+	res32->lvb_offset = res->lvb_offset;
+	res32->length = res->length;
+
+	res32->lksb.sb_status = res->lksb.sb_status;
+	res32->lksb.sb_flags = res->lksb.sb_flags;
+	res32->lksb.sb_lkid = res->lksb.sb_lkid;
+	res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
+}
+#endif
+
+
+void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
+{
+	struct dlm_ls *ls;
+	struct dlm_user_args *ua;
+	struct dlm_user_proc *proc;
+
+	/* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each
+	   lkb before dealing with it.  We need to check this
+	   flag before taking ls_clear_proc_locks mutex because if
+	   it's set, dlm_clear_proc_locks() holds the mutex. */
+
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
+		/* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
+		return;
+	}
+
+	ls = lkb->lkb_resource->res_ls;
+	mutex_lock(&ls->ls_clear_proc_locks);
+
+	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
+	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
+	   lkb->ua so we can't try to use it. */
+
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
+		/* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
+		goto out;
+	}
+
+	DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
+	ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	proc = ua->proc;
+
+	if (type == AST_BAST && ua->bastaddr == NULL)
+		goto out;
+
+	spin_lock(&proc->asts_spin);
+	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
+		kref_get(&lkb->lkb_ref);
+		list_add_tail(&lkb->lkb_astqueue, &proc->asts);
+		lkb->lkb_ast_type |= type;
+		wake_up_interruptible(&proc->wait);
+	}
+
+	/* We want to copy the lvb to userspace when the completion
+	   ast is read if the status is 0, the lock has an lvb and
+	   lvb_ops says we should.  We could probably have set_lvb_lock()
+	   set update_user_lvb instead and not need old_mode */
+
+	if ((lkb->lkb_ast_type & AST_COMP) &&
+	    (lkb->lkb_lksb->sb_status == 0) &&
+	    lkb->lkb_lksb->sb_lvbptr &&
+	    dlm_lvb_operations[ua->old_mode + 1][lkb->lkb_grmode + 1])
+		ua->update_user_lvb = 1;
+	else
+		ua->update_user_lvb = 0;
+
+	spin_unlock(&proc->asts_spin);
+ out:
+	mutex_unlock(&ls->ls_clear_proc_locks);
+}
+
+static int device_user_lock(struct dlm_user_proc *proc,
+			    struct dlm_lock_params *params)
+{
+	struct dlm_ls *ls;
+	struct dlm_user_args *ua;
+	int error = -ENOMEM;
+
+	ls = dlm_find_lockspace_local(proc->lockspace);
+	if (!ls)
+		return -ENOENT;
+
+	if (!params->castaddr || !params->lksb) {
+		error = -EINVAL;
+		goto out;
+	}
+
+	ua = kzalloc(sizeof(struct dlm_user_args), GFP_KERNEL);
+	if (!ua)
+		goto out;
+	ua->proc = proc;
+	ua->user_lksb = params->lksb;
+	ua->castparam = params->castparam;
+	ua->castaddr = params->castaddr;
+	ua->bastparam = params->bastparam;
+	ua->bastaddr = params->bastaddr;
+
+	if (params->flags & DLM_LKF_CONVERT)
+		error = dlm_user_convert(ls, ua,
+				         params->mode, params->flags,
+				         params->lkid, params->lvb);
+	else {
+		error = dlm_user_request(ls, ua,
+					 params->mode, params->flags,
+					 params->name, params->namelen,
+					 params->parent);
+		if (!error)
+			error = ua->lksb.sb_lkid;
+	}
+ out:
+	dlm_put_lockspace(ls);
+	return error;
+}
+
+static int device_user_unlock(struct dlm_user_proc *proc,
+			      struct dlm_lock_params *params)
+{
+	struct dlm_ls *ls;
+	struct dlm_user_args *ua;
+	int error = -ENOMEM;
+
+	ls = dlm_find_lockspace_local(proc->lockspace);
+	if (!ls)
+		return -ENOENT;
+
+	ua = kzalloc(sizeof(struct dlm_user_args), GFP_KERNEL);
+	if (!ua)
+		goto out;
+	ua->proc = proc;
+	ua->user_lksb = params->lksb;
+	ua->castparam = params->castparam;
+	ua->castaddr = params->castaddr;
+
+	if (params->flags & DLM_LKF_CANCEL)
+		error = dlm_user_cancel(ls, ua, params->flags, params->lkid);
+	else
+		error = dlm_user_unlock(ls, ua, params->flags, params->lkid,
+					params->lvb);
+ out:
+	dlm_put_lockspace(ls);
+	return error;
+}
+
+static int device_create_lockspace(struct dlm_lspace_params *params)
+{
+	dlm_lockspace_t *lockspace;
+	struct dlm_ls *ls;
+	int error, len;
+
+	if (!capable(CAP_SYS_ADMIN))
+		return -EPERM;
+
+	error = dlm_new_lockspace(params->name, strlen(params->name),
+				  &lockspace, 0, DLM_USER_LVB_LEN);
+	if (error)
+		return error;
+
+	ls = dlm_find_lockspace_local(lockspace);
+	if (!ls)
+		return -ENOENT;
+
+	error = -ENOMEM;
+	len = strlen(params->name) + strlen(name_prefix) + 2;
+	ls->ls_device.name = kzalloc(len, GFP_KERNEL);
+	if (!ls->ls_device.name)
+		goto fail;
+	snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix,
+		 params->name);
+	ls->ls_device.fops = &device_fops;
+	ls->ls_device.minor = MISC_DYNAMIC_MINOR;
+
+	error = misc_register(&ls->ls_device);
+	if (error) {
+		kfree(ls->ls_device.name);
+		goto fail;
+	}
+
+	error = ls->ls_device.minor;
+	dlm_put_lockspace(ls);
+	return error;
+
+ fail:
+	dlm_put_lockspace(ls);
+	dlm_release_lockspace(lockspace, 0);
+	return error;
+}
+
+static int device_remove_lockspace(struct dlm_lspace_params *params)
+{
+	dlm_lockspace_t *lockspace;
+	struct dlm_ls *ls;
+	int error;
+
+	if (!capable(CAP_SYS_ADMIN))
+		return -EPERM;
+
+	ls = dlm_find_lockspace_device(params->minor);
+	if (!ls)
+		return -ENOENT;
+
+	error = misc_deregister(&ls->ls_device);
+	if (error) {
+		dlm_put_lockspace(ls);
+		goto out;
+	}
+	kfree(ls->ls_device.name);
+
+	lockspace = ls->ls_local_handle;
+
+	/* dlm_release_lockspace waits for references to go to zero,
+	   so all processes will need to close their device for the ls
+	   before the release will procede */
+
+	dlm_put_lockspace(ls);
+	error = dlm_release_lockspace(lockspace, 0);
+out:
+	return error;
+}
+
+/* Check the user's version matches ours */
+static int check_version(struct dlm_write_request *req)
+{
+	if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
+	    (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
+	     req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
+
+		printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
+		       "user (%d.%d.%d) kernel (%d.%d.%d)\n",
+		       current->comm,
+		       current->pid,
+		       req->version[0],
+		       req->version[1],
+		       req->version[2],
+		       DLM_DEVICE_VERSION_MAJOR,
+		       DLM_DEVICE_VERSION_MINOR,
+		       DLM_DEVICE_VERSION_PATCH);
+		return -EINVAL;
+	}
+	return 0;
+}
+
+/*
+ * device_write
+ *
+ *   device_user_lock
+ *     dlm_user_request -> request_lock
+ *     dlm_user_convert -> convert_lock
+ *
+ *   device_user_unlock
+ *     dlm_user_unlock -> unlock_lock
+ *     dlm_user_cancel -> cancel_lock
+ *
+ *   device_create_lockspace
+ *     dlm_new_lockspace
+ *
+ *   device_remove_lockspace
+ *     dlm_release_lockspace
+ */
+
+/* a write to a lockspace device is a lock or unlock request, a write
+   to the control device is to create/remove a lockspace */
+
+static ssize_t device_write(struct file *file, const char __user *buf,
+			    size_t count, loff_t *ppos)
+{
+	struct dlm_user_proc *proc = file->private_data;
+	struct dlm_write_request *kbuf;
+	sigset_t tmpsig, allsigs;
+	int error;
+
+#ifdef CONFIG_COMPAT
+	if (count < sizeof(struct dlm_write_request32))
+#else
+	if (count < sizeof(struct dlm_write_request))
+#endif
+		return -EINVAL;
+
+	kbuf = kmalloc(count, GFP_KERNEL);
+	if (!kbuf)
+		return -ENOMEM;
+
+	if (copy_from_user(kbuf, buf, count)) {
+		error = -EFAULT;
+		goto out_free;
+	}
+
+	if (check_version(kbuf)) {
+		error = -EBADE;
+		goto out_free;
+	}
+
+#ifdef CONFIG_COMPAT
+	if (!kbuf->is64bit) {
+		struct dlm_write_request32 *k32buf;
+		k32buf = (struct dlm_write_request32 *)kbuf;
+		kbuf = kmalloc(count + (sizeof(struct dlm_write_request) -
+			       sizeof(struct dlm_write_request32)), GFP_KERNEL);
+		if (!kbuf)
+			return -ENOMEM;
+
+		if (proc)
+			set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags);
+		compat_input(kbuf, k32buf);
+		kfree(k32buf);
+	}
+#endif
+
+	/* do we really need this? can a write happen after a close? */
+	if ((kbuf->cmd == DLM_USER_LOCK || kbuf->cmd == DLM_USER_UNLOCK) &&
+	    test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
+		return -EINVAL;
+
+	sigfillset(&allsigs);
+	sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
+
+	error = -EINVAL;
+
+	switch (kbuf->cmd)
+	{
+	case DLM_USER_LOCK:
+		if (!proc) {
+			log_print("no locking on control device");
+			goto out_sig;
+		}
+		error = device_user_lock(proc, &kbuf->i.lock);
+		break;
+
+	case DLM_USER_UNLOCK:
+		if (!proc) {
+			log_print("no locking on control device");
+			goto out_sig;
+		}
+		error = device_user_unlock(proc, &kbuf->i.lock);
+		break;
+
+	case DLM_USER_CREATE_LOCKSPACE:
+		if (proc) {
+			log_print("create/remove only on control device");
+			goto out_sig;
+		}
+		error = device_create_lockspace(&kbuf->i.lspace);
+		break;
+
+	case DLM_USER_REMOVE_LOCKSPACE:
+		if (proc) {
+			log_print("create/remove only on control device");
+			goto out_sig;
+		}
+		error = device_remove_lockspace(&kbuf->i.lspace);
+		break;
+
+	default:
+		log_print("Unknown command passed to DLM device : %d\n",
+			  kbuf->cmd);
+	}
+
+ out_sig:
+	sigprocmask(SIG_SETMASK, &tmpsig, NULL);
+	recalc_sigpending();
+ out_free:
+	kfree(kbuf);
+	return error;
+}
+
+/* Every process that opens the lockspace device has its own "proc" structure
+   hanging off the open file that's used to keep track of locks owned by the
+   process and asts that need to be delivered to the process. */
+
+static int device_open(struct inode *inode, struct file *file)
+{
+	struct dlm_user_proc *proc;
+	struct dlm_ls *ls;
+
+	ls = dlm_find_lockspace_device(iminor(inode));
+	if (!ls)
+		return -ENOENT;
+
+	proc = kzalloc(sizeof(struct dlm_user_proc), GFP_KERNEL);
+	if (!proc) {
+		dlm_put_lockspace(ls);
+		return -ENOMEM;
+	}
+
+	proc->lockspace = ls->ls_local_handle;
+	INIT_LIST_HEAD(&proc->asts);
+	INIT_LIST_HEAD(&proc->locks);
+	spin_lock_init(&proc->asts_spin);
+	spin_lock_init(&proc->locks_spin);
+	init_waitqueue_head(&proc->wait);
+	file->private_data = proc;
+
+	return 0;
+}
+
+static int device_close(struct inode *inode, struct file *file)
+{
+	struct dlm_user_proc *proc = file->private_data;
+	struct dlm_ls *ls;
+	sigset_t tmpsig, allsigs;
+
+	ls = dlm_find_lockspace_local(proc->lockspace);
+	if (!ls)
+		return -ENOENT;
+
+	sigfillset(&allsigs);
+	sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
+
+	set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags);
+
+	dlm_clear_proc_locks(ls, proc);
+
+	/* at this point no more lkb's should exist for this lockspace,
+	   so there's no chance of dlm_user_add_ast() being called and
+	   looking for lkb->ua->proc */
+
+	kfree(proc);
+	file->private_data = NULL;
+
+	dlm_put_lockspace(ls);
+	dlm_put_lockspace(ls);  /* for the find in device_open() */
+
+	/* FIXME: AUTOFREE: if this ls is no longer used do
+	   device_remove_lockspace() */
+
+	sigprocmask(SIG_SETMASK, &tmpsig, NULL);
+	recalc_sigpending();
+
+	return 0;
+}
+
+static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type,
+			       int bmode, char __user *buf, size_t count)
+{
+#ifdef CONFIG_COMPAT
+	struct dlm_lock_result32 result32;
+#endif
+	struct dlm_lock_result result;
+	void *resultptr;
+	int error=0;
+	int len;
+	int struct_len;
+
+	memset(&result, 0, sizeof(struct dlm_lock_result));
+	memcpy(&result.lksb, &ua->lksb, sizeof(struct dlm_lksb));
+	result.user_lksb = ua->user_lksb;
+
+	/* FIXME: dlm1 provides for the user's bastparam/addr to not be updated
+	   in a conversion unless the conversion is successful.  See code
+	   in dlm_user_convert() for updating ua from ua_tmp.  OpenVMS, though,
+	   notes that a new blocking AST address and parameter are set even if
+	   the conversion fails, so maybe we should just do that. */
+
+	if (type == AST_BAST) {
+		result.user_astaddr = ua->bastaddr;
+		result.user_astparam = ua->bastparam;
+		result.bast_mode = bmode;
+	} else {
+		result.user_astaddr = ua->castaddr;
+		result.user_astparam = ua->castparam;
+	}
+
+#ifdef CONFIG_COMPAT
+	if (compat)
+		len = sizeof(struct dlm_lock_result32);
+	else
+#endif
+		len = sizeof(struct dlm_lock_result);
+	struct_len = len;
+
+	/* copy lvb to userspace if there is one, it's been updated, and
+	   the user buffer has space for it */
+
+	if (ua->update_user_lvb && ua->lksb.sb_lvbptr &&
+	    count >= len + DLM_USER_LVB_LEN) {
+		if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
+				 DLM_USER_LVB_LEN)) {
+			error = -EFAULT;
+			goto out;
+		}
+
+		result.lvb_offset = len;
+		len += DLM_USER_LVB_LEN;
+	}
+
+	result.length = len;
+	resultptr = &result;
+#ifdef CONFIG_COMPAT
+	if (compat) {
+		compat_output(&result, &result32);
+		resultptr = &result32;
+	}
+#endif
+
+	if (copy_to_user(buf, resultptr, struct_len))
+		error = -EFAULT;
+	else
+		error = len;
+ out:
+	return error;
+}
+
+/* a read returns a single ast described in a struct dlm_lock_result */
+
+static ssize_t device_read(struct file *file, char __user *buf, size_t count,
+			   loff_t *ppos)
+{
+	struct dlm_user_proc *proc = file->private_data;
+	struct dlm_lkb *lkb;
+	struct dlm_user_args *ua;
+	DECLARE_WAITQUEUE(wait, current);
+	int error, type=0, bmode=0, removed = 0;
+
+#ifdef CONFIG_COMPAT
+	if (count < sizeof(struct dlm_lock_result32))
+#else
+	if (count < sizeof(struct dlm_lock_result))
+#endif
+		return -EINVAL;
+
+	/* do we really need this? can a read happen after a close? */
+	if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
+		return -EINVAL;
+
+	spin_lock(&proc->asts_spin);
+	if (list_empty(&proc->asts)) {
+		if (file->f_flags & O_NONBLOCK) {
+			spin_unlock(&proc->asts_spin);
+			return -EAGAIN;
+		}
+
+		add_wait_queue(&proc->wait, &wait);
+
+	repeat:
+		set_current_state(TASK_INTERRUPTIBLE);
+		if (list_empty(&proc->asts) && !signal_pending(current)) {
+			spin_unlock(&proc->asts_spin);
+			schedule();
+			spin_lock(&proc->asts_spin);
+			goto repeat;
+		}
+		set_current_state(TASK_RUNNING);
+		remove_wait_queue(&proc->wait, &wait);
+
+		if (signal_pending(current)) {
+			spin_unlock(&proc->asts_spin);
+			return -ERESTARTSYS;
+		}
+	}
+
+	if (list_empty(&proc->asts)) {
+		spin_unlock(&proc->asts_spin);
+		return -EAGAIN;
+	}
+
+	/* there may be both completion and blocking asts to return for
+	   the lkb, don't remove lkb from asts list unless no asts remain */
+
+	lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
+
+	if (lkb->lkb_ast_type & AST_COMP) {
+		lkb->lkb_ast_type &= ~AST_COMP;
+		type = AST_COMP;
+	} else if (lkb->lkb_ast_type & AST_BAST) {
+		lkb->lkb_ast_type &= ~AST_BAST;
+		type = AST_BAST;
+		bmode = lkb->lkb_bastmode;
+	}
+
+	if (!lkb->lkb_ast_type) {
+		list_del(&lkb->lkb_astqueue);
+		removed = 1;
+	}
+	spin_unlock(&proc->asts_spin);
+
+	ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	error = copy_result_to_user(ua,
+			 	test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
+				type, bmode, buf, count);
+
+	/* removes reference for the proc->asts lists added by
+	   dlm_user_add_ast() and may result in the lkb being freed */
+	if (removed)
+		dlm_put_lkb(lkb);
+
+	return error;
+}
+
+static unsigned int device_poll(struct file *file, poll_table *wait)
+{
+	struct dlm_user_proc *proc = file->private_data;
+
+	poll_wait(file, &proc->wait, wait);
+
+	spin_lock(&proc->asts_spin);
+	if (!list_empty(&proc->asts)) {
+		spin_unlock(&proc->asts_spin);
+		return POLLIN | POLLRDNORM;
+	}
+	spin_unlock(&proc->asts_spin);
+	return 0;
+}
+
+static int ctl_device_open(struct inode *inode, struct file *file)
+{
+	file->private_data = NULL;
+	return 0;
+}
+
+static int ctl_device_close(struct inode *inode, struct file *file)
+{
+	return 0;
+}
+
+static struct file_operations device_fops = {
+	.open    = device_open,
+	.release = device_close,
+	.read    = device_read,
+	.write   = device_write,
+	.poll    = device_poll,
+	.owner   = THIS_MODULE,
+};
+
+static struct file_operations ctl_device_fops = {
+	.open    = ctl_device_open,
+	.release = ctl_device_close,
+	.write   = device_write,
+	.owner   = THIS_MODULE,
+};
+
+int dlm_user_init(void)
+{
+	int error;
+
+	ctl_device.name = "dlm-control";
+	ctl_device.fops = &ctl_device_fops;
+	ctl_device.minor = MISC_DYNAMIC_MINOR;
+
+	error = misc_register(&ctl_device);
+	if (error)
+		log_print("misc_register failed for control device");
+
+	return error;
+}
+
+void dlm_user_exit(void)
+{
+	misc_deregister(&ctl_device);
+}
+
diff --git a/fs/dlm/user.h b/fs/dlm/user.h
new file mode 100644
index 0000000..d38e9f3
--- /dev/null
+++ b/fs/dlm/user.h
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU General Public License v.2.
+ */
+
+#ifndef __USER_DOT_H__
+#define __USER_DOT_H__
+
+void dlm_user_add_ast(struct dlm_lkb *lkb, int type);
+int dlm_user_init(void);
+void dlm_user_exit(void);
+
+#endif