mac80211: serialize rx path workers

This patch addresses the issue of serialization between
the main rx path and various reorder release timers.

<http://www.spinics.net/lists/linux-wireless/msg57214.html>

It converts the previously local "frames" queue into
a global rx queue [rx_skb_queue]. This way, everyone
(be it the main rx-path or some reorder release timeout)
can add frames to it.

Only one active rx handler worker [ieee80211_rx_handlers]
is needed. All other threads which have lost the race of
"runnning_rx_handler" can now simply "return", knowing that
the thread who had the "edge" will also take care of their
workload.

Signed-off-by: Christian Lamparter <chunkeey@googlemail.com>
Signed-off-by: John W. Linville <linville@tuxdriver.com>
diff --git a/net/mac80211/ieee80211_i.h b/net/mac80211/ieee80211_i.h
index a9eb673..1bf2fc8 100644
--- a/net/mac80211/ieee80211_i.h
+++ b/net/mac80211/ieee80211_i.h
@@ -761,6 +761,15 @@
 	struct sk_buff_head skb_queue;
 	struct sk_buff_head skb_queue_unreliable;
 
+	/*
+	 * Internal FIFO queue which is shared between multiple rx path
+	 * stages. Its main task is to provide a serialization mechanism,
+	 * so all rx handlers can enjoy having exclusive access to their
+	 * private data structures.
+	 */
+	struct sk_buff_head rx_skb_queue;
+	bool running_rx_handler;	/* protected by rx_skb_queue.lock */
+
 	/* Station data */
 	/*
 	 * The mutex only protects the list and counter,
diff --git a/net/mac80211/main.c b/net/mac80211/main.c
index a21d049..32e7ae0 100644
--- a/net/mac80211/main.c
+++ b/net/mac80211/main.c
@@ -569,6 +569,8 @@
 	spin_lock_init(&local->filter_lock);
 	spin_lock_init(&local->queue_stop_reason_lock);
 
+	skb_queue_head_init(&local->rx_skb_queue);
+
 	INIT_DELAYED_WORK(&local->scan_work, ieee80211_scan_work);
 
 	ieee80211_work_init(local);
@@ -912,6 +914,7 @@
 		wiphy_warn(local->hw.wiphy, "skb_queue not empty\n");
 	skb_queue_purge(&local->skb_queue);
 	skb_queue_purge(&local->skb_queue_unreliable);
+	skb_queue_purge(&local->rx_skb_queue);
 
 	destroy_workqueue(local->workqueue);
 	wiphy_unregister(local->hw.wiphy);
diff --git a/net/mac80211/rx.c b/net/mac80211/rx.c
index 260b48b..12cbb4d 100644
--- a/net/mac80211/rx.c
+++ b/net/mac80211/rx.c
@@ -533,9 +533,9 @@
 
 static void ieee80211_release_reorder_frame(struct ieee80211_hw *hw,
 					    struct tid_ampdu_rx *tid_agg_rx,
-					    int index,
-					    struct sk_buff_head *frames)
+					    int index)
 {
+	struct ieee80211_local *local = hw_to_local(hw);
 	struct sk_buff *skb = tid_agg_rx->reorder_buf[index];
 	struct ieee80211_rx_status *status;
 
@@ -549,7 +549,7 @@
 	tid_agg_rx->reorder_buf[index] = NULL;
 	status = IEEE80211_SKB_RXCB(skb);
 	status->rx_flags |= IEEE80211_RX_DEFERRED_RELEASE;
-	__skb_queue_tail(frames, skb);
+	skb_queue_tail(&local->rx_skb_queue, skb);
 
 no_frame:
 	tid_agg_rx->head_seq_num = seq_inc(tid_agg_rx->head_seq_num);
@@ -557,8 +557,7 @@
 
 static void ieee80211_release_reorder_frames(struct ieee80211_hw *hw,
 					     struct tid_ampdu_rx *tid_agg_rx,
-					     u16 head_seq_num,
-					     struct sk_buff_head *frames)
+					     u16 head_seq_num)
 {
 	int index;
 
@@ -567,7 +566,7 @@
 	while (seq_less(tid_agg_rx->head_seq_num, head_seq_num)) {
 		index = seq_sub(tid_agg_rx->head_seq_num, tid_agg_rx->ssn) %
 							tid_agg_rx->buf_size;
-		ieee80211_release_reorder_frame(hw, tid_agg_rx, index, frames);
+		ieee80211_release_reorder_frame(hw, tid_agg_rx, index);
 	}
 }
 
@@ -583,8 +582,7 @@
 #define HT_RX_REORDER_BUF_TIMEOUT (HZ / 10)
 
 static void ieee80211_sta_reorder_release(struct ieee80211_hw *hw,
-					  struct tid_ampdu_rx *tid_agg_rx,
-					  struct sk_buff_head *frames)
+					  struct tid_ampdu_rx *tid_agg_rx)
 {
 	int index, j;
 
@@ -615,8 +613,7 @@
 				wiphy_debug(hw->wiphy,
 					    "release an RX reorder frame due to timeout on earlier frames\n");
 #endif
-			ieee80211_release_reorder_frame(hw, tid_agg_rx,
-							j, frames);
+			ieee80211_release_reorder_frame(hw, tid_agg_rx, j);
 
 			/*
 			 * Increment the head seq# also for the skipped slots.
@@ -626,7 +623,7 @@
 			skipped = 0;
 		}
 	} else while (tid_agg_rx->reorder_buf[index]) {
-		ieee80211_release_reorder_frame(hw, tid_agg_rx, index, frames);
+		ieee80211_release_reorder_frame(hw, tid_agg_rx, index);
 		index =	seq_sub(tid_agg_rx->head_seq_num, tid_agg_rx->ssn) %
 							tid_agg_rx->buf_size;
 	}
@@ -682,8 +679,7 @@
  */
 static bool ieee80211_sta_manage_reorder_buf(struct ieee80211_hw *hw,
 					     struct tid_ampdu_rx *tid_agg_rx,
-					     struct sk_buff *skb,
-					     struct sk_buff_head *frames)
+					     struct sk_buff *skb)
 {
 	struct ieee80211_hdr *hdr = (struct ieee80211_hdr *) skb->data;
 	u16 sc = le16_to_cpu(hdr->seq_ctrl);
@@ -710,8 +706,7 @@
 	if (!seq_less(mpdu_seq_num, head_seq_num + buf_size)) {
 		head_seq_num = seq_inc(seq_sub(mpdu_seq_num, buf_size));
 		/* release stored frames up to new head to stack */
-		ieee80211_release_reorder_frames(hw, tid_agg_rx, head_seq_num,
-						 frames);
+		ieee80211_release_reorder_frames(hw, tid_agg_rx, head_seq_num);
 	}
 
 	/* Now the new frame is always in the range of the reordering buffer */
@@ -739,7 +734,7 @@
 	tid_agg_rx->reorder_buf[index] = skb;
 	tid_agg_rx->reorder_time[index] = jiffies;
 	tid_agg_rx->stored_mpdu_num++;
-	ieee80211_sta_reorder_release(hw, tid_agg_rx, frames);
+	ieee80211_sta_reorder_release(hw, tid_agg_rx);
 
  out:
 	spin_unlock(&tid_agg_rx->reorder_lock);
@@ -750,8 +745,7 @@
  * Reorder MPDUs from A-MPDUs, keeping them on a buffer. Returns
  * true if the MPDU was buffered, false if it should be processed.
  */
-static void ieee80211_rx_reorder_ampdu(struct ieee80211_rx_data *rx,
-				       struct sk_buff_head *frames)
+static void ieee80211_rx_reorder_ampdu(struct ieee80211_rx_data *rx)
 {
 	struct sk_buff *skb = rx->skb;
 	struct ieee80211_local *local = rx->local;
@@ -806,11 +800,11 @@
 	 * sure that we cannot get to it any more before doing
 	 * anything with it.
 	 */
-	if (ieee80211_sta_manage_reorder_buf(hw, tid_agg_rx, skb, frames))
+	if (ieee80211_sta_manage_reorder_buf(hw, tid_agg_rx, skb))
 		return;
 
  dont_reorder:
-	__skb_queue_tail(frames, skb);
+	skb_queue_tail(&local->rx_skb_queue, skb);
 }
 
 static ieee80211_rx_result debug_noinline
@@ -1931,7 +1925,7 @@
 }
 
 static ieee80211_rx_result debug_noinline
-ieee80211_rx_h_ctrl(struct ieee80211_rx_data *rx, struct sk_buff_head *frames)
+ieee80211_rx_h_ctrl(struct ieee80211_rx_data *rx)
 {
 	struct ieee80211_local *local = rx->local;
 	struct ieee80211_hw *hw = &local->hw;
@@ -1971,8 +1965,7 @@
 
 		spin_lock(&tid_agg_rx->reorder_lock);
 		/* release stored frames up to start of BAR */
-		ieee80211_release_reorder_frames(hw, tid_agg_rx, start_seq_num,
-						 frames);
+		ieee80211_release_reorder_frames(hw, tid_agg_rx, start_seq_num);
 		spin_unlock(&tid_agg_rx->reorder_lock);
 
 		kfree_skb(skb);
@@ -2489,8 +2482,7 @@
 	}
 }
 
-static void ieee80211_rx_handlers(struct ieee80211_rx_data *rx,
-				  struct sk_buff_head *frames)
+static void ieee80211_rx_handlers(struct ieee80211_rx_data *rx)
 {
 	ieee80211_rx_result res = RX_DROP_MONITOR;
 	struct sk_buff *skb;
@@ -2502,7 +2494,15 @@
 			goto rxh_next;  \
 	} while (0);
 
-	while ((skb = __skb_dequeue(frames))) {
+	spin_lock(&rx->local->rx_skb_queue.lock);
+	if (rx->local->running_rx_handler)
+		goto unlock;
+
+	rx->local->running_rx_handler = true;
+
+	while ((skb = __skb_dequeue(&rx->local->rx_skb_queue))) {
+		spin_unlock(&rx->local->rx_skb_queue.lock);
+
 		/*
 		 * all the other fields are valid across frames
 		 * that belong to an aMPDU since they are on the
@@ -2525,12 +2525,7 @@
 			CALL_RXH(ieee80211_rx_h_mesh_fwding);
 #endif
 		CALL_RXH(ieee80211_rx_h_data)
-
-		/* special treatment -- needs the queue */
-		res = ieee80211_rx_h_ctrl(rx, frames);
-		if (res != RX_CONTINUE)
-			goto rxh_next;
-
+		CALL_RXH(ieee80211_rx_h_ctrl);
 		CALL_RXH(ieee80211_rx_h_mgmt_check)
 		CALL_RXH(ieee80211_rx_h_action)
 		CALL_RXH(ieee80211_rx_h_userspace_mgmt)
@@ -2539,18 +2534,20 @@
 
  rxh_next:
 		ieee80211_rx_handlers_result(rx, res);
-
+		spin_lock(&rx->local->rx_skb_queue.lock);
 #undef CALL_RXH
 	}
+
+	rx->local->running_rx_handler = false;
+
+ unlock:
+	spin_unlock(&rx->local->rx_skb_queue.lock);
 }
 
 static void ieee80211_invoke_rx_handlers(struct ieee80211_rx_data *rx)
 {
-	struct sk_buff_head reorder_release;
 	ieee80211_rx_result res = RX_DROP_MONITOR;
 
-	__skb_queue_head_init(&reorder_release);
-
 #define CALL_RXH(rxh)			\
 	do {				\
 		res = rxh(rx);		\
@@ -2561,9 +2558,9 @@
 	CALL_RXH(ieee80211_rx_h_passive_scan)
 	CALL_RXH(ieee80211_rx_h_check)
 
-	ieee80211_rx_reorder_ampdu(rx, &reorder_release);
+	ieee80211_rx_reorder_ampdu(rx);
 
-	ieee80211_rx_handlers(rx, &reorder_release);
+	ieee80211_rx_handlers(rx);
 	return;
 
  rxh_next:
@@ -2578,7 +2575,6 @@
  */
 void ieee80211_release_reorder_timeout(struct sta_info *sta, int tid)
 {
-	struct sk_buff_head frames;
 	struct ieee80211_rx_data rx = {
 		.sta = sta,
 		.sdata = sta->sdata,
@@ -2591,13 +2587,11 @@
 	if (!tid_agg_rx)
 		return;
 
-	__skb_queue_head_init(&frames);
-
 	spin_lock(&tid_agg_rx->reorder_lock);
-	ieee80211_sta_reorder_release(&sta->local->hw, tid_agg_rx, &frames);
+	ieee80211_sta_reorder_release(&sta->local->hw, tid_agg_rx);
 	spin_unlock(&tid_agg_rx->reorder_lock);
 
-	ieee80211_rx_handlers(&rx, &frames);
+	ieee80211_rx_handlers(&rx);
 }
 
 /* main receive path */