drbd: don't BUG_ON, if bio_add_page of a single page to an empty bio fails

Just deal with it more gracefully, if we fail to add even a single page
to an empty bio. We used to BUG_ON() there, but it has been observed in
some Xen deployment, so we need to handle that case more robustly now.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index a56b107..9e9fc34 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -1073,6 +1073,16 @@
  * @mdev:	DRBD device.
  * @e:		epoch entry
  * @rw:		flag field, see bio->bi_rw
+ *
+ * May spread the pages to multiple bios,
+ * depending on bio_add_page restrictions.
+ *
+ * Returns 0 if all bios have been submitted,
+ * -ENOMEM if we could not allocate enough bios,
+ * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
+ *  single page to an empty bio (which should never happen and likely indicates
+ *  that the lower level IO stack is in some way broken). This has been observed
+ *  on certain Xen deployments.
  */
 /* TODO allocate from our own bio_set. */
 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
@@ -1085,6 +1095,7 @@
 	unsigned ds = e->size;
 	unsigned n_bios = 0;
 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
+	int err = -ENOMEM;
 
 	/* In most cases, we will only need one bio.  But in case the lower
 	 * level restrictions happen to be different at this offset on this
@@ -1110,8 +1121,17 @@
 	page_chain_for_each(page) {
 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
 		if (!bio_add_page(bio, page, len, 0)) {
-			/* a single page must always be possible! */
-			BUG_ON(bio->bi_vcnt == 0);
+			/* A single page must always be possible!
+			 * But in case it fails anyways,
+			 * we deal with it, and complain (below). */
+			if (bio->bi_vcnt == 0) {
+				dev_err(DEV,
+					"bio_add_page failed for len=%u, "
+					"bi_vcnt=0 (bi_sector=%llu)\n",
+					len, (unsigned long long)bio->bi_sector);
+				err = -ENOSPC;
+				goto fail;
+			}
 			goto next_bio;
 		}
 		ds -= len;
@@ -1137,7 +1157,7 @@
 		bios = bios->bi_next;
 		bio_put(bio);
 	}
-	return -ENOMEM;
+	return err;
 }
 
 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
@@ -1436,9 +1456,8 @@
 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
 		return true;
 
-	/* drbd_submit_ee currently fails for one reason only:
-	 * not being able to allocate enough bios.
-	 * Is dropping the connection going to help? */
+	/* don't care for the reason here */
+	dev_err(DEV, "submit failed, triggering re-connect\n");
 	spin_lock_irq(&mdev->req_lock);
 	list_del(&e->w.list);
 	spin_unlock_irq(&mdev->req_lock);
@@ -1837,9 +1856,8 @@
 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
 		return true;
 
-	/* drbd_submit_ee currently fails for one reason only:
-	 * not being able to allocate enough bios.
-	 * Is dropping the connection going to help? */
+	/* don't care for the reason here */
+	dev_err(DEV, "submit failed, triggering re-connect\n");
 	spin_lock_irq(&mdev->req_lock);
 	list_del(&e->w.list);
 	hlist_del_init(&e->colision);
@@ -1848,9 +1866,7 @@
 		drbd_al_complete_io(mdev, e->sector);
 
 out_interrupted:
-	/* yes, the epoch_size now is imbalanced.
-	 * but we drop the connection anyways, so we don't have a chance to
-	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
+	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
 	put_ldev(mdev);
 	drbd_free_ee(mdev, e);
 	return false;
@@ -2096,9 +2112,8 @@
 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
 		return true;
 
-	/* drbd_submit_ee currently fails for one reason only:
-	 * not being able to allocate enough bios.
-	 * Is dropping the connection going to help? */
+	/* don't care for the reason here */
+	dev_err(DEV, "submit failed, triggering re-connect\n");
 	spin_lock_irq(&mdev->req_lock);
 	list_del(&e->w.list);
 	spin_unlock_irq(&mdev->req_lock);