Merge branch 'rwsem-optimizations'

Merge rwsem optimizations from Michel Lespinasse:
 "These patches extend Alex Shi's work (which added write lock stealing
  on the rwsem slow path) in order to provide rwsem write lock stealing
  on the fast path (that is, without taking the rwsem's wait_lock).

  I have unfortunately been unable to push this through -next before due
  to Ingo Molnar / David Howells / Peter Zijlstra being busy with other
  things.  However, this has gotten some attention from Rik van Riel and
  Davidlohr Bueso who both commented that they felt this was ready for
  v3.10, and Ingo Molnar has said that he was OK with me pushing
  directly to you.  So, here goes :)

  Davidlohr got the following test results from pgbench running on a
  quad-core laptop:

    | db_size | clients  |  tps-vanilla   |   tps-rwsem  |
    +---------+----------+----------------+--------------+
    | 160 MB   |       1 |           5803 |         6906 | + 19.0%
    | 160 MB   |       2 |          13092 |        15931 |
    | 160 MB   |       4 |          29412 |        33021 |
    | 160 MB   |       8 |          32448 |        34626 |
    | 160 MB   |      16 |          32758 |        33098 |
    | 160 MB   |      20 |          26940 |        31343 | + 16.3%
    | 160 MB   |      30 |          25147 |        28961 |
    | 160 MB   |      40 |          25484 |        26902 |
    | 160 MB   |      50 |          24528 |        25760 |
    ------------------------------------------------------
    | 1.6 GB   |       1 |           5733 |         7729 | + 34.8%
    | 1.6 GB   |       2 |           9411 |        19009 | + 101.9%
    | 1.6 GB   |       4 |          31818 |        33185 |
    | 1.6 GB   |       8 |          33700 |        34550 |
    | 1.6 GB   |      16 |          32751 |        33079 |
    | 1.6 GB   |      20 |          30919 |        31494 |
    | 1.6 GB   |      30 |          28540 |        28535 |
    | 1.6 GB   |      40 |          26380 |        27054 |
    | 1.6 GB   |      50 |          25241 |        25591 |
    ------------------------------------------------------
    | 7.6 GB   |       1 |           5779 |         6224 |
    | 7.6 GB   |       2 |          10897 |        13611 | + 24.9%
    | 7.6 GB   |       4 |          32683 |        33108 |
    | 7.6 GB   |       8 |          33968 |        34712 |
    | 7.6 GB   |      16 |          32287 |        32895 |
    | 7.6 GB   |      20 |          27770 |        31689 | + 14.1%
    | 7.6 GB   |      30 |          26739 |        29003 |
    | 7.6 GB   |      40 |          24901 |        26683 |
    | 7.6 GB   |      50 |          17115 |        25925 | + 51.5%
    ------------------------------------------------------

  (Davidlohr also has one additional patch which further improves
  throughput, though I will ask him to send it directly to you as I have
  suggested some minor changes)."

* emailed patches from Michel Lespinasse <walken@google.com>:
  rwsem: no need for explicit signed longs
  x86 rwsem: avoid taking slow path when stealing write lock
  rwsem: do not block readers at head of queue if other readers are active
  rwsem: implement support for write lock stealing on the fastpath
  rwsem: simplify __rwsem_do_wake
  rwsem: skip initial trylock in rwsem_down_write_failed
  rwsem: avoid taking wait_lock in rwsem_down_write_failed
  rwsem: use cmpxchg for trying to steal write lock
  rwsem: more agressive lock stealing in rwsem_down_write_failed
  rwsem: simplify rwsem_down_write_failed
  rwsem: simplify rwsem_down_read_failed
  rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed
  rwsem: shorter spinlocked section in rwsem_down_failed_common()
  rwsem: make the waiter type an enumeration rather than a bitmask
diff --git a/arch/x86/include/asm/rwsem.h b/arch/x86/include/asm/rwsem.h
index 2dbe4a7..cad82c9 100644
--- a/arch/x86/include/asm/rwsem.h
+++ b/arch/x86/include/asm/rwsem.h
@@ -105,8 +105,8 @@
 	asm volatile("# beginning down_write\n\t"
 		     LOCK_PREFIX "  xadd      %1,(%2)\n\t"
 		     /* adds 0xffff0001, returns the old value */
-		     "  test      %1,%1\n\t"
-		     /* was the count 0 before? */
+		     "  test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+		     /* was the active mask 0 before? */
 		     "  jz        1f\n"
 		     "  call call_rwsem_down_write_failed\n"
 		     "1:\n"
@@ -126,11 +126,25 @@
  */
 static inline int __down_write_trylock(struct rw_semaphore *sem)
 {
-	long ret = cmpxchg(&sem->count, RWSEM_UNLOCKED_VALUE,
-			   RWSEM_ACTIVE_WRITE_BIAS);
-	if (ret == RWSEM_UNLOCKED_VALUE)
-		return 1;
-	return 0;
+	long result, tmp;
+	asm volatile("# beginning __down_write_trylock\n\t"
+		     "  mov          %0,%1\n\t"
+		     "1:\n\t"
+		     "  test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+		     /* was the active mask 0 before? */
+		     "  jnz          2f\n\t"
+		     "  mov          %1,%2\n\t"
+		     "  add          %3,%2\n\t"
+		     LOCK_PREFIX "  cmpxchg  %2,%0\n\t"
+		     "  jnz	     1b\n\t"
+		     "2:\n\t"
+		     "  sete         %b1\n\t"
+		     "  movzbl       %b1, %k1\n\t"
+		     "# ending __down_write_trylock\n\t"
+		     : "+m" (sem->count), "=&a" (result), "=&r" (tmp)
+		     : "er" (RWSEM_ACTIVE_WRITE_BIAS)
+		     : "memory", "cc");
+	return result;
 }
 
 /*
diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
index 7542afb..9be8a91 100644
--- a/lib/rwsem-spinlock.c
+++ b/lib/rwsem-spinlock.c
@@ -9,12 +9,15 @@
 #include <linux/sched.h>
 #include <linux/export.h>
 
+enum rwsem_waiter_type {
+	RWSEM_WAITING_FOR_WRITE,
+	RWSEM_WAITING_FOR_READ
+};
+
 struct rwsem_waiter {
 	struct list_head list;
 	struct task_struct *task;
-	unsigned int flags;
-#define RWSEM_WAITING_FOR_READ	0x00000001
-#define RWSEM_WAITING_FOR_WRITE	0x00000002
+	enum rwsem_waiter_type type;
 };
 
 int rwsem_is_locked(struct rw_semaphore *sem)
@@ -67,26 +70,17 @@
 
 	waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
 
-	if (!wakewrite) {
-		if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
-			goto out;
-		goto dont_wake_writers;
-	}
-
-	/*
-	 * as we support write lock stealing, we can't set sem->activity
-	 * to -1 here to indicate we get the lock. Instead, we wake it up
-	 * to let it go get it again.
-	 */
-	if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
-		wake_up_process(waiter->task);
+	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+		if (wakewrite)
+			/* Wake up a writer. Note that we do not grant it the
+			 * lock - it will have to acquire it when it runs. */
+			wake_up_process(waiter->task);
 		goto out;
 	}
 
 	/* grant an infinite number of read locks to the front of the queue */
- dont_wake_writers:
 	woken = 0;
-	while (waiter->flags & RWSEM_WAITING_FOR_READ) {
+	do {
 		struct list_head *next = waiter->list.next;
 
 		list_del(&waiter->list);
@@ -96,10 +90,10 @@
 		wake_up_process(tsk);
 		put_task_struct(tsk);
 		woken++;
-		if (list_empty(&sem->wait_list))
+		if (next == &sem->wait_list)
 			break;
 		waiter = list_entry(next, struct rwsem_waiter, list);
-	}
+	} while (waiter->type != RWSEM_WAITING_FOR_WRITE);
 
 	sem->activity += woken;
 
@@ -144,7 +138,7 @@
 
 	/* set up my own style of waitqueue */
 	waiter.task = tsk;
-	waiter.flags = RWSEM_WAITING_FOR_READ;
+	waiter.type = RWSEM_WAITING_FOR_READ;
 	get_task_struct(tsk);
 
 	list_add_tail(&waiter.list, &sem->wait_list);
@@ -201,7 +195,7 @@
 	/* set up my own style of waitqueue */
 	tsk = current;
 	waiter.task = tsk;
-	waiter.flags = RWSEM_WAITING_FOR_WRITE;
+	waiter.type = RWSEM_WAITING_FOR_WRITE;
 	list_add_tail(&waiter.list, &sem->wait_list);
 
 	/* wait for someone to release the lock */
diff --git a/lib/rwsem.c b/lib/rwsem.c
index ad5e0df..cf0ad2a 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -4,6 +4,7 @@
  * Derived from arch/i386/kernel/semaphore.c
  *
  * Writer lock-stealing by Alex Shi <alex.shi@intel.com>
+ * and Michel Lespinasse <walken@google.com>
  */
 #include <linux/rwsem.h>
 #include <linux/sched.h>
@@ -30,21 +31,22 @@
 
 EXPORT_SYMBOL(__init_rwsem);
 
+enum rwsem_waiter_type {
+	RWSEM_WAITING_FOR_WRITE,
+	RWSEM_WAITING_FOR_READ
+};
+
 struct rwsem_waiter {
 	struct list_head list;
 	struct task_struct *task;
-	unsigned int flags;
-#define RWSEM_WAITING_FOR_READ	0x00000001
-#define RWSEM_WAITING_FOR_WRITE	0x00000002
+	enum rwsem_waiter_type type;
 };
 
-/* Wake types for __rwsem_do_wake().  Note that RWSEM_WAKE_NO_ACTIVE and
- * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
- * since the rwsem value was observed.
- */
-#define RWSEM_WAKE_ANY        0 /* Wake whatever's at head of wait list */
-#define RWSEM_WAKE_NO_ACTIVE  1 /* rwsem was observed with no active thread */
-#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
+enum rwsem_wake_type {
+	RWSEM_WAKE_ANY,		/* Wake whatever's at head of wait list */
+	RWSEM_WAKE_READERS,	/* Wake readers only */
+	RWSEM_WAKE_READ_OWNED	/* Waker thread holds the read lock */
+};
 
 /*
  * handle the lock release when processes blocked on it that can now run
@@ -57,46 +59,43 @@
  * - writers are only woken if downgrading is false
  */
 static struct rw_semaphore *
-__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
+__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
 {
 	struct rwsem_waiter *waiter;
 	struct task_struct *tsk;
 	struct list_head *next;
-	signed long woken, loop, adjustment;
+	long oldcount, woken, loop, adjustment;
 
 	waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
-	if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
-		goto readers_only;
-
-	if (wake_type == RWSEM_WAKE_READ_OWNED)
-		/* Another active reader was observed, so wakeup is not
-		 * likely to succeed. Save the atomic op.
-		 */
+	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+		if (wake_type == RWSEM_WAKE_ANY)
+			/* Wake writer at the front of the queue, but do not
+			 * grant it the lock yet as we want other writers
+			 * to be able to steal it.  Readers, on the other hand,
+			 * will block as they will notice the queued writer.
+			 */
+			wake_up_process(waiter->task);
 		goto out;
+	}
 
-	/* Wake up the writing waiter and let the task grab the sem: */
-	wake_up_process(waiter->task);
-	goto out;
-
- readers_only:
-	/* If we come here from up_xxxx(), another thread might have reached
-	 * rwsem_down_failed_common() before we acquired the spinlock and
-	 * woken up a waiter, making it now active.  We prefer to check for
-	 * this first in order to not spend too much time with the spinlock
-	 * held if we're not going to be able to wake up readers in the end.
-	 *
-	 * Note that we do not need to update the rwsem count: any writer
-	 * trying to acquire rwsem will run rwsem_down_write_failed() due
-	 * to the waiting threads and block trying to acquire the spinlock.
-	 *
-	 * We use a dummy atomic update in order to acquire the cache line
-	 * exclusively since we expect to succeed and run the final rwsem
-	 * count adjustment pretty soon.
+	/* Writers might steal the lock before we grant it to the next reader.
+	 * We prefer to do the first reader grant before counting readers
+	 * so we can bail out early if a writer stole the lock.
 	 */
-	if (wake_type == RWSEM_WAKE_ANY &&
-	    rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
-		/* Someone grabbed the sem for write already */
-		goto out;
+	adjustment = 0;
+	if (wake_type != RWSEM_WAKE_READ_OWNED) {
+		adjustment = RWSEM_ACTIVE_READ_BIAS;
+ try_reader_grant:
+		oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
+		if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
+			/* A writer stole the lock. Undo our reader grant. */
+			if (rwsem_atomic_update(-adjustment, sem) &
+						RWSEM_ACTIVE_MASK)
+				goto out;
+			/* Last active locker left. Retry waking readers. */
+			goto try_reader_grant;
+		}
+	}
 
 	/* Grant an infinite number of read locks to the readers at the front
 	 * of the queue.  Note we increment the 'active part' of the count by
@@ -112,17 +111,19 @@
 		waiter = list_entry(waiter->list.next,
 					struct rwsem_waiter, list);
 
-	} while (waiter->flags & RWSEM_WAITING_FOR_READ);
+	} while (waiter->type != RWSEM_WAITING_FOR_WRITE);
 
-	adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
-	if (waiter->flags & RWSEM_WAITING_FOR_READ)
+	adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
+	if (waiter->type != RWSEM_WAITING_FOR_WRITE)
 		/* hit end of list above */
 		adjustment -= RWSEM_WAITING_BIAS;
 
-	rwsem_atomic_add(adjustment, sem);
+	if (adjustment)
+		rwsem_atomic_add(adjustment, sem);
 
 	next = sem->wait_list.next;
-	for (loop = woken; loop > 0; loop--) {
+	loop = woken;
+	do {
 		waiter = list_entry(next, struct rwsem_waiter, list);
 		next = waiter->list.next;
 		tsk = waiter->task;
@@ -130,7 +131,7 @@
 		waiter->task = NULL;
 		wake_up_process(tsk);
 		put_task_struct(tsk);
-	}
+	} while (--loop);
 
 	sem->wait_list.next = next;
 	next->prev = &sem->wait_list;
@@ -139,60 +140,21 @@
 	return sem;
 }
 
-/* Try to get write sem, caller holds sem->wait_lock: */
-static int try_get_writer_sem(struct rw_semaphore *sem,
-					struct rwsem_waiter *waiter)
-{
-	struct rwsem_waiter *fwaiter;
-	long oldcount, adjustment;
-
-	/* only steal when first waiter is writing */
-	fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
-	if (!(fwaiter->flags & RWSEM_WAITING_FOR_WRITE))
-		return 0;
-
-	adjustment = RWSEM_ACTIVE_WRITE_BIAS;
-	/* Only one waiter in the queue: */
-	if (fwaiter == waiter && waiter->list.next == &sem->wait_list)
-		adjustment -= RWSEM_WAITING_BIAS;
-
-try_again_write:
-	oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
-	if (!(oldcount & RWSEM_ACTIVE_MASK)) {
-		/* No active lock: */
-		struct task_struct *tsk = waiter->task;
-
-		list_del(&waiter->list);
-		smp_mb();
-		put_task_struct(tsk);
-		tsk->state = TASK_RUNNING;
-		return 1;
-	}
-	/* some one grabbed the sem already */
-	if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
-		return 0;
-	goto try_again_write;
-}
-
 /*
- * wait for a lock to be granted
+ * wait for the read lock to be granted
  */
-static struct rw_semaphore __sched *
-rwsem_down_failed_common(struct rw_semaphore *sem,
-			 unsigned int flags, signed long adjustment)
+struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
 {
+	long count, adjustment = -RWSEM_ACTIVE_READ_BIAS;
 	struct rwsem_waiter waiter;
 	struct task_struct *tsk = current;
-	signed long count;
-
-	set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 
 	/* set up my own style of waitqueue */
-	raw_spin_lock_irq(&sem->wait_lock);
 	waiter.task = tsk;
-	waiter.flags = flags;
+	waiter.type = RWSEM_WAITING_FOR_READ;
 	get_task_struct(tsk);
 
+	raw_spin_lock_irq(&sem->wait_lock);
 	if (list_empty(&sem->wait_list))
 		adjustment += RWSEM_WAITING_BIAS;
 	list_add_tail(&waiter.list, &sem->wait_list);
@@ -200,35 +162,24 @@
 	/* we're now waiting on the lock, but no longer actively locking */
 	count = rwsem_atomic_update(adjustment, sem);
 
-	/* If there are no active locks, wake the front queued process(es) up.
+	/* If there are no active locks, wake the front queued process(es).
 	 *
-	 * Alternatively, if we're called from a failed down_write(), there
-	 * were already threads queued before us and there are no active
-	 * writers, the lock must be read owned; so we try to wake any read
-	 * locks that were queued ahead of us. */
-	if (count == RWSEM_WAITING_BIAS)
-		sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
-	else if (count > RWSEM_WAITING_BIAS &&
-		 adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
-		sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+	 * If there are no writers and we are first in the queue,
+	 * wake our own waiter to join the existing active readers !
+	 */
+	if (count == RWSEM_WAITING_BIAS ||
+	    (count > RWSEM_WAITING_BIAS &&
+	     adjustment != -RWSEM_ACTIVE_READ_BIAS))
+		sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
 
 	raw_spin_unlock_irq(&sem->wait_lock);
 
 	/* wait to be given the lock */
-	for (;;) {
+	while (true) {
+		set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 		if (!waiter.task)
 			break;
-
-		raw_spin_lock_irq(&sem->wait_lock);
-		/* Try to get the writer sem, may steal from the head writer: */
-		if (flags == RWSEM_WAITING_FOR_WRITE)
-			if (try_get_writer_sem(sem, &waiter)) {
-				raw_spin_unlock_irq(&sem->wait_lock);
-				return sem;
-			}
-		raw_spin_unlock_irq(&sem->wait_lock);
 		schedule();
-		set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 	}
 
 	tsk->state = TASK_RUNNING;
@@ -237,21 +188,62 @@
 }
 
 /*
- * wait for the read lock to be granted
- */
-struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
-{
-	return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ,
-					-RWSEM_ACTIVE_READ_BIAS);
-}
-
-/*
- * wait for the write lock to be granted
+ * wait until we successfully acquire the write lock
  */
 struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
 {
-	return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE,
-					-RWSEM_ACTIVE_WRITE_BIAS);
+	long count, adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
+	struct rwsem_waiter waiter;
+	struct task_struct *tsk = current;
+
+	/* set up my own style of waitqueue */
+	waiter.task = tsk;
+	waiter.type = RWSEM_WAITING_FOR_WRITE;
+
+	raw_spin_lock_irq(&sem->wait_lock);
+	if (list_empty(&sem->wait_list))
+		adjustment += RWSEM_WAITING_BIAS;
+	list_add_tail(&waiter.list, &sem->wait_list);
+
+	/* we're now waiting on the lock, but no longer actively locking */
+	count = rwsem_atomic_update(adjustment, sem);
+
+	/* If there were already threads queued before us and there are no
+	 * active writers, the lock must be read owned; so we try to wake
+	 * any read locks that were queued ahead of us. */
+	if (count > RWSEM_WAITING_BIAS &&
+	    adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
+		sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);
+
+	/* wait until we successfully acquire the lock */
+	set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+	while (true) {
+		if (!(count & RWSEM_ACTIVE_MASK)) {
+			/* Try acquiring the write lock. */
+			count = RWSEM_ACTIVE_WRITE_BIAS;
+			if (!list_is_singular(&sem->wait_list))
+				count += RWSEM_WAITING_BIAS;
+			if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+							RWSEM_WAITING_BIAS)
+				break;
+		}
+
+		raw_spin_unlock_irq(&sem->wait_lock);
+
+		/* Block until there are no active lockers. */
+		do {
+			schedule();
+			set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+		} while ((count = sem->count) & RWSEM_ACTIVE_MASK);
+
+		raw_spin_lock_irq(&sem->wait_lock);
+	}
+
+	list_del(&waiter.list);
+	raw_spin_unlock_irq(&sem->wait_lock);
+	tsk->state = TASK_RUNNING;
+
+	return sem;
 }
 
 /*