tipc: Major redesign of broadcast link ACK/NACK algorithms

Completely redesigns broadcast link ACK and NACK mechanisms to prevent
spurious retransmit requests in dual LAN networks, and to prevent the
broadcast link from stalling due to the failure of a receiving node to
acknowledge receiving a broadcast message or request its retransmission.

Note: These changes only impact the timing of when ACK and NACK messages
are sent, and not the basic broadcast link protocol itself, so inter-
operability with nodes using the "classic" algorithms is maintained.

The revised algorithms are as follows:

1) An explicit ACK message is still sent after receiving 16 in-sequence
messages, and implicit ACK information continues to be carried in other
unicast link message headers (including link state messages).  However,
the timing of explicit ACKs is now based on the receiving node's absolute
network address rather than its relative network address to ensure that
the failure of another node does not delay the ACK beyond its 16 message
target.

2) A NACK message is now typically sent only when a message gap persists
for two consecutive incoming link state messages; this ensures that a
suspected gap is not confirmed until both LANs in a dual LAN network have
had an opportunity to deliver the message, thereby preventing spurious NACKs.
A NACK message can also be generated by the arrival of a single link state
message, if the deferred queue is so big that the current message gap
cannot be the result of "normal" mis-ordering due to the use of dual LANs
(or one LAN using a bonded interface). Since link state messages typically
arrive at different nodes at different times the problem of multiple nodes
issuing identical NACKs simultaneously is inherently avoided.

3) Nodes continue to "peek" at NACK messages sent by other nodes. If
another node requests retransmission of a message gap suspected (but not
yet confirmed) by the peeking node, the peeking node forgets about the
gap and does not generate a duplicate retransmit request. (If the peeking
node subsequently fails to receive the lost message, later link state
messages will cause it to rediscover and confirm the gap and send another
NACK.)

4) Message gap "equality" is now determined by the start of the gap only.
This is sufficient to deal with the most common cases of message loss,
and eliminates the need for complex end of gap computations.

5) A peeking node no longer tries to determine whether it should send a
complementary NACK, since the most common cases of message loss don't
require it to be sent. Consequently, the node no longer examines the
"broadcast tag" field of a NACK message when peeking.

Signed-off-by: Allan Stephens <allan.stephens@windriver.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index facc216..1f3b160 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -157,39 +157,14 @@
 	return bcl->fsm_msg_cnt;
 }
 
-/**
- * bclink_set_gap - set gap according to contents of current deferred pkt queue
- *
- * Called with 'node' locked, bc_lock unlocked
- */
-
-static void bclink_set_gap(struct tipc_node *n_ptr)
+static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
 {
-	struct sk_buff *buf = n_ptr->bclink.deferred_head;
-
-	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
-		mod(n_ptr->bclink.last_in);
-	if (unlikely(buf != NULL))
-		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
-}
-
-/**
- * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
- *
- * This mechanism endeavours to prevent all nodes in network from trying
- * to ACK or NACK at the same time.
- *
- * Note: TIPC uses a different trigger to distribute ACKs than it does to
- *       distribute NACKs, but tries to use the same spacing (divide by 16).
- */
-
-static int bclink_ack_allowed(u32 n)
-{
-	return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
+	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
+						seqno : node->bclink.last_sent;
 }
 
 
-/**
+/*
  * tipc_bclink_retransmit_to - get most recent node to request retransmission
  *
  * Called with bc_lock locked
@@ -300,44 +275,56 @@
 	spin_unlock_bh(&bc_lock);
 }
 
-/**
- * bclink_send_ack - unicast an ACK msg
+/*
+ * tipc_bclink_update_link_state - update broadcast link state
  *
  * tipc_net_lock and node lock set
  */
 
-static void bclink_send_ack(struct tipc_node *n_ptr)
-{
-	struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-
-	if (l_ptr != NULL)
-		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-}
-
-/**
- * bclink_send_nack- broadcast a NACK msg
- *
- * tipc_net_lock and node lock set
- */
-
-static void bclink_send_nack(struct tipc_node *n_ptr)
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 {
 	struct sk_buff *buf;
-	struct tipc_msg *msg;
 
-	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
+	/* Ignore "stale" link state info */
+
+	if (less_eq(last_sent, n_ptr->bclink.last_in))
 		return;
 
+	/* Update link synchronization state; quit if in sync */
+
+	bclink_update_last_sent(n_ptr, last_sent);
+
+	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
+		return;
+
+	/* Update out-of-sync state; quit if loss is still unconfirmed */
+
+	if ((++n_ptr->bclink.oos_state) == 1) {
+		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
+			return;
+		n_ptr->bclink.oos_state++;
+	}
+
+	/* Don't NACK if one has been recently sent (or seen) */
+
+	if (n_ptr->bclink.oos_state & 0x1)
+		return;
+
+	/* Send NACK */
+
 	buf = tipc_buf_acquire(INT_H_SIZE);
 	if (buf) {
-		msg = buf_msg(buf);
+		struct tipc_msg *msg = buf_msg(buf);
+
 		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
-			 INT_H_SIZE, n_ptr->addr);
+			      INT_H_SIZE, n_ptr->addr);
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tipc_net_id);
-		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
-		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
-		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
+		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
+				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
+				 : n_ptr->bclink.last_sent);
 		msg_set_bcast_tag(msg, tipc_own_tag);
 
 		spin_lock_bh(&bc_lock);
@@ -346,96 +333,37 @@
 		spin_unlock_bh(&bc_lock);
 		buf_discard(buf);
 
-		/*
-		 * Ensure we doesn't send another NACK msg to the node
-		 * until 16 more deferred messages arrive from it
-		 * (i.e. helps prevent all nodes from NACK'ing at same time)
-		 */
-
-		n_ptr->bclink.nack_sync = tipc_own_tag;
+		n_ptr->bclink.oos_state++;
 	}
 }
 
-/**
- * tipc_bclink_check_gap - send a NACK if a sequence gap exists
+/*
+ * bclink_peek_nack - monitor retransmission requests sent by other nodes
  *
- * tipc_net_lock and node lock set
- */
-
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
-{
-	if (!n_ptr->bclink.supported ||
-	    less_eq(last_sent, mod(n_ptr->bclink.last_in)))
-		return;
-
-	bclink_set_gap(n_ptr);
-	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
-		n_ptr->bclink.gap_to = last_sent;
-	bclink_send_nack(n_ptr);
-}
-
-/**
- * tipc_bclink_peek_nack - process a NACK msg meant for another node
+ * Delay any upcoming NACK by this node if another node has already
+ * requested the first message this node is going to ask for.
  *
  * Only tipc_net_lock set.
  */
 
-static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
+static void bclink_peek_nack(struct tipc_msg *msg)
 {
-	struct tipc_node *n_ptr = tipc_node_find(dest);
-	u32 my_after, my_to;
+	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
 
-	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
+	if (unlikely(!n_ptr))
 		return;
+
 	tipc_node_lock(n_ptr);
-	/*
-	 * Modify gap to suppress unnecessary NACKs from this node
-	 */
-	my_after = n_ptr->bclink.gap_after;
-	my_to = n_ptr->bclink.gap_to;
 
-	if (less_eq(gap_after, my_after)) {
-		if (less(my_after, gap_to) && less(gap_to, my_to))
-			n_ptr->bclink.gap_after = gap_to;
-		else if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
-	} else if (less_eq(gap_after, my_to)) {
-		if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = gap_after;
-	} else {
-		/*
-		 * Expand gap if missing bufs not in deferred queue:
-		 */
-		struct sk_buff *buf = n_ptr->bclink.deferred_head;
-		u32 prev = n_ptr->bclink.gap_to;
+	if (n_ptr->bclink.supported &&
+	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
+	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
+		n_ptr->bclink.oos_state = 2;
 
-		for (; buf; buf = buf->next) {
-			u32 seqno = buf_seqno(buf);
-
-			if (mod(seqno - prev) != 1) {
-				buf = NULL;
-				break;
-			}
-			if (seqno == gap_after)
-				break;
-			prev = seqno;
-		}
-		if (buf == NULL)
-			n_ptr->bclink.gap_to = gap_after;
-	}
-	/*
-	 * Some nodes may send a complementary NACK now:
-	 */
-	if (bclink_ack_allowed(sender_tag + 1)) {
-		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
-			bclink_send_nack(n_ptr);
-			bclink_set_gap(n_ptr);
-		}
-	}
 	tipc_node_unlock(n_ptr);
 }
 
-/**
+/*
  * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
  */
 
@@ -505,10 +433,7 @@
 			spin_unlock_bh(&bc_lock);
 		} else {
 			tipc_node_unlock(node);
-			tipc_bclink_peek_nack(msg_destnode(msg),
-					      msg_bcast_tag(msg),
-					      msg_bcgap_after(msg),
-					      msg_bcgap_to(msg));
+			bclink_peek_nack(msg);
 		}
 		goto exit;
 	}
@@ -519,16 +444,28 @@
 	next_in = mod(node->bclink.last_in + 1);
 
 	if (likely(seqno == next_in)) {
+		bclink_update_last_sent(node, seqno);
 receive:
+		node->bclink.last_in = seqno;
+		node->bclink.oos_state = 0;
+
 		spin_lock_bh(&bc_lock);
 		bcl->stats.recv_info++;
-		node->bclink.last_in++;
-		bclink_set_gap(node);
-		if (unlikely(bclink_ack_allowed(seqno))) {
-			bclink_send_ack(node);
+
+		/*
+		 * Unicast an ACK periodically, ensuring that
+		 * all nodes in the cluster don't ACK at the same time
+		 */
+
+		if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
+			tipc_link_send_proto_msg(
+				node->active_links[node->addr & 1],
+				STATE_MSG, 0, 0, 0, 0, 0);
 			bcl->stats.sent_acks++;
 		}
 
+		/* Deliver message to destination */
+
 		if (likely(msg_isdata(msg))) {
 			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
@@ -567,9 +504,14 @@
 		if (unlikely(!tipc_node_is_up(node)))
 			goto unlock;
 
-		if (!node->bclink.deferred_head)
+		if (node->bclink.last_in == node->bclink.last_sent)
 			goto unlock;
 
+		if (!node->bclink.deferred_head) {
+			node->bclink.oos_state = 1;
+			goto unlock;
+		}
+
 		msg = buf_msg(node->bclink.deferred_head);
 		seqno = msg_seqno(msg);
 		next_in = mod(next_in + 1);
@@ -580,31 +522,19 @@
 
 		buf = node->bclink.deferred_head;
 		node->bclink.deferred_head = buf->next;
+		node->bclink.deferred_size--;
 		goto receive;
 	}
 
 	/* Handle out-of-sequence broadcast message */
 
 	if (less(next_in, seqno)) {
-		u32 gap_after = node->bclink.gap_after;
-		u32 gap_to = node->bclink.gap_to;
-
 		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
 					       &node->bclink.deferred_tail,
 					       buf);
-		if (deferred) {
-			node->bclink.nack_sync++;
-			if (seqno == mod(gap_after + 1))
-				node->bclink.gap_after = seqno;
-			else if (less(gap_after, seqno) && less(seqno, gap_to))
-				node->bclink.gap_to = seqno;
-		}
+		node->bclink.deferred_size += deferred;
+		bclink_update_last_sent(node, seqno);
 		buf = NULL;
-		if (bclink_ack_allowed(node->bclink.nack_sync)) {
-			if (gap_to != gap_after)
-				bclink_send_nack(node);
-			bclink_set_gap(node);
-		}
 	} else
 		deferred = 0;
 
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index b009666..5571394 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -96,7 +96,7 @@
 void tipc_bclink_recv_pkt(struct sk_buff *buf);
 u32  tipc_bclink_get_last_sent(void);
 u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno);
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
 int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1150ba5..cce9537 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1501,14 +1501,13 @@
 		tipc_node_lock(n_ptr);
 
 		tipc_addr_string_fill(addr_string, n_ptr->addr);
-		info("Multicast link info for %s\n", addr_string);
+		info("Broadcast link info for %s\n", addr_string);
 		info("Supportable: %d,  ", n_ptr->bclink.supportable);
 		info("Supported: %d,  ", n_ptr->bclink.supported);
 		info("Acked: %u\n", n_ptr->bclink.acked);
 		info("Last in: %u,  ", n_ptr->bclink.last_in);
-		info("Gap after: %u,  ", n_ptr->bclink.gap_after);
-		info("Gap to: %u\n", n_ptr->bclink.gap_to);
-		info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
+		info("Oos state: %u,  ", n_ptr->bclink.oos_state);
+		info("Last sent: %u\n", n_ptr->bclink.last_sent);
 
 		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
 
@@ -1974,7 +1973,7 @@
 
 	msg_set_type(msg, msg_typ);
 	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
-	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
+	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
 
 	if (msg_typ == STATE_MSG) {
@@ -2133,8 +2132,12 @@
 
 		/* Synchronize broadcast link info, if not done previously */
 
-		if (!tipc_node_is_up(l_ptr->owner))
-			l_ptr->owner->bclink.last_in = msg_last_bcast(msg);
+		if (!tipc_node_is_up(l_ptr->owner)) {
+			l_ptr->owner->bclink.last_sent =
+				l_ptr->owner->bclink.last_in =
+				msg_last_bcast(msg);
+			l_ptr->owner->bclink.oos_state = 0;
+		}
 
 		l_ptr->peer_session = msg_session(msg);
 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
@@ -2181,7 +2184,9 @@
 
 		/* Protocol message before retransmits, reduce loss risk */
 
-		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
+		if (l_ptr->owner->bclink.supported)
+			tipc_bclink_update_link_state(l_ptr->owner,
+						      msg_last_bcast(msg));
 
 		if (rec_gap || (msg_probe(msg))) {
 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9196f94..6d8bdfd 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -339,12 +339,12 @@
 	/* Flush broadcast link info associated with lost node */
 
 	if (n_ptr->bclink.supported) {
-		n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
 		while (n_ptr->bclink.deferred_head) {
 			struct sk_buff *buf = n_ptr->bclink.deferred_head;
 			n_ptr->bclink.deferred_head = buf->next;
 			buf_discard(buf);
 		}
+		n_ptr->bclink.deferred_size = 0;
 
 		if (n_ptr->bclink.defragm) {
 			buf_discard(n_ptr->bclink.defragm);
@@ -450,7 +450,7 @@
 
 	read_lock_bh(&tipc_net_lock);
 
-	/* Get space for all unicast links + multicast link */
+	/* Get space for all unicast links + broadcast link */
 
 	payload_size = TLV_SPACE(sizeof(link_info)) *
 		(atomic_read(&tipc_num_links) + 1);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 90689f4..c88ce64 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -66,9 +66,9 @@
  *    @supported: non-zero if node supports TIPC b'cast capability
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
  *    @last_in: sequence # of last in-sequence b'cast message received from node
- *    @gap_after: sequence # of last message not requiring a NAK request
- *    @gap_to: sequence # of last message requiring a NAK request
- *    @nack_sync: counter that determines when NAK requests should be sent
+ *    @last_sent: sequence # of last b'cast message sent by node
+ *    @oos_state: state tracker for handling OOS b'cast messages
+ *    @deferred_size: number of OOS b'cast messages in deferred queue
  *    @deferred_head: oldest OOS b'cast message received from node
  *    @deferred_tail: newest OOS b'cast message received from node
  *    @defragm: list of partially reassembled b'cast message fragments from node
@@ -91,9 +91,9 @@
 		u8 supported;
 		u32 acked;
 		u32 last_in;
-		u32 gap_after;
-		u32 gap_to;
-		u32 nack_sync;
+		u32 last_sent;
+		u32 oos_state;
+		u32 deferred_size;
 		struct sk_buff *deferred_head;
 		struct sk_buff *deferred_tail;
 		struct sk_buff *defragm;