Throttle control interface event message bursts

Some operations like a new scan result processing can result in large
number of wpa_supplicant control interface messages being generated.
Especially with multiple control interface monitors, this could result
in hitting the output queue length maximum and event messages getting
dropped. In worst case, that could even result in hitting ten
consecutive sendto() errors which could result in an attached monitor
socket getting detached.

Avoid this type of issues by throttling monitor event transmission based
on the output queue length. If more than half of the maximum send buffer
is used, postpone sending of following event messages until the pending
output queue has dropped below the limit.

Bug: 24270573

Change-Id: I3905f3ec925ee3bdc67f7fb93991e620a2a6ee57
Signed-off-by: Jouni Malinen <j@w1.fi>
Signed-off-by: Dmitry Shmidt <dimitrysh@google.com>
diff --git a/wpa_supplicant/ctrl_iface_unix.c b/wpa_supplicant/ctrl_iface_unix.c
index b6d5246..160a6f0 100644
--- a/wpa_supplicant/ctrl_iface_unix.c
+++ b/wpa_supplicant/ctrl_iface_unix.c
@@ -52,6 +52,8 @@
 	int sock;
 	struct dl_list ctrl_dst;
 	int android_control_socket;
+	struct dl_list msg_queue;
+	unsigned int throttle_count;
 };
 
 
@@ -60,6 +62,17 @@
 	int sock;
 	struct dl_list ctrl_dst;
 	int android_control_socket;
+	struct dl_list msg_queue;
+	unsigned int throttle_count;
+};
+
+struct ctrl_iface_msg {
+	struct dl_list list;
+	struct wpa_supplicant *wpa_s;
+	int level;
+	enum wpa_msg_type type;
+	const char *txt;
+	size_t len;
 };
 
 
@@ -334,33 +347,209 @@
 }
 
 
+static int wpas_ctrl_iface_throttle(int sock)
+{
+#ifdef __linux__
+	socklen_t optlen;
+	int sndbuf, outq;
+
+	optlen = sizeof(sndbuf);
+	sndbuf = 0;
+	if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, &optlen) < 0 ||
+	    ioctl(sock, SIOCOUTQ, &outq) < 0 ||
+	    sndbuf <= 0 || outq < 0)
+		return 0;
+	return outq > sndbuf / 2;
+#else /* __linux__ */
+	return 0;
+#endif /* __linux__ */
+}
+
+
+static void wpas_ctrl_msg_send_pending_global(struct wpa_global *global)
+{
+	struct ctrl_iface_global_priv *gpriv;
+	struct ctrl_iface_msg *msg;
+
+	gpriv = global->ctrl_iface;
+	while (gpriv && !dl_list_empty(&gpriv->msg_queue) &&
+	       !wpas_ctrl_iface_throttle(gpriv->sock)) {
+		msg = dl_list_first(&gpriv->msg_queue, struct ctrl_iface_msg,
+				    list);
+		if (!msg)
+			break;
+		dl_list_del(&msg->list);
+		wpa_supplicant_ctrl_iface_send(
+			msg->wpa_s,
+			msg->type != WPA_MSG_PER_INTERFACE ?
+			NULL : msg->wpa_s->ifname,
+			gpriv->sock, &gpriv->ctrl_dst, msg->level,
+			msg->txt, msg->len, NULL, gpriv);
+		os_free(msg);
+	}
+}
+
+
+static void wpas_ctrl_msg_send_pending_iface(struct wpa_supplicant *wpa_s)
+{
+	struct ctrl_iface_priv *priv;
+	struct ctrl_iface_msg *msg;
+
+	priv = wpa_s->ctrl_iface;
+	while (priv && !dl_list_empty(&priv->msg_queue) &&
+	       !wpas_ctrl_iface_throttle(priv->sock)) {
+		msg = dl_list_first(&priv->msg_queue, struct ctrl_iface_msg,
+				    list);
+		if (!msg)
+			break;
+		dl_list_del(&msg->list);
+		wpa_supplicant_ctrl_iface_send(wpa_s, NULL, priv->sock,
+					       &priv->ctrl_dst, msg->level,
+					       msg->txt, msg->len, priv, NULL);
+		os_free(msg);
+	}
+}
+
+
+static void wpas_ctrl_msg_queue_timeout(void *eloop_ctx, void *timeout_ctx)
+{
+	struct wpa_supplicant *wpa_s = eloop_ctx;
+	struct ctrl_iface_priv *priv;
+	struct ctrl_iface_global_priv *gpriv;
+	int sock = -1, gsock = -1;
+
+	wpas_ctrl_msg_send_pending_global(wpa_s->global);
+	wpas_ctrl_msg_send_pending_iface(wpa_s);
+
+	priv = wpa_s->ctrl_iface;
+	if (priv && !dl_list_empty(&priv->msg_queue))
+		sock = priv->sock;
+
+	gpriv = wpa_s->global->ctrl_iface;
+	if (gpriv && !dl_list_empty(&gpriv->msg_queue))
+		gsock = gpriv->sock;
+
+	if (sock > -1 || gsock > -1) {
+		/* Continue pending message transmission from a timeout */
+		wpa_printf(MSG_MSGDUMP,
+			   "CTRL: Had to throttle pending event message transmission for (sock %d gsock %d)",
+			   sock, gsock);
+		eloop_register_timeout(0, 20000, wpas_ctrl_msg_queue_timeout,
+				       wpa_s, NULL);
+	}
+}
+
+
+static void wpas_ctrl_msg_queue(struct dl_list *queue,
+				struct wpa_supplicant *wpa_s, int level,
+				enum wpa_msg_type type,
+				const char *txt, size_t len)
+{
+	struct ctrl_iface_msg *msg;
+
+	msg = os_zalloc(sizeof(*msg) + len);
+	if (!msg)
+		return;
+
+	msg->wpa_s = wpa_s;
+	msg->level = level;
+	msg->type = type;
+	os_memcpy(msg + 1, txt, len);
+	msg->txt = (const char *) (msg + 1);
+	msg->len = len;
+	dl_list_add_tail(queue, &msg->list);
+	eloop_cancel_timeout(wpas_ctrl_msg_queue_timeout, wpa_s, NULL);
+	eloop_register_timeout(0, 0, wpas_ctrl_msg_queue_timeout, wpa_s, NULL);
+}
+
+
+static void wpas_ctrl_msg_queue_limit(unsigned int throttle_count,
+				      struct dl_list *queue)
+{
+	struct ctrl_iface_msg *msg;
+
+	if (throttle_count < 2000)
+		return;
+
+	msg = dl_list_first(queue, struct ctrl_iface_msg, list);
+	if (msg) {
+		wpa_printf(MSG_DEBUG, "CTRL: Dropped oldest pending message");
+		dl_list_del(&msg->list);
+		os_free(msg);
+	}
+}
+
+
 static void wpa_supplicant_ctrl_iface_msg_cb(void *ctx, int level,
 					     enum wpa_msg_type type,
 					     const char *txt, size_t len)
 {
 	struct wpa_supplicant *wpa_s = ctx;
+	struct ctrl_iface_priv *priv;
+	struct ctrl_iface_global_priv *gpriv;
 
 	if (wpa_s == NULL)
 		return;
 
-	if (type != WPA_MSG_NO_GLOBAL && wpa_s->global->ctrl_iface) {
-		struct ctrl_iface_global_priv *priv = wpa_s->global->ctrl_iface;
-		if (!dl_list_empty(&priv->ctrl_dst)) {
+	gpriv = wpa_s->global->ctrl_iface;
+
+	if (type != WPA_MSG_NO_GLOBAL && gpriv &&
+	    !dl_list_empty(&gpriv->ctrl_dst)) {
+		if (!dl_list_empty(&gpriv->msg_queue) ||
+		    wpas_ctrl_iface_throttle(gpriv->sock)) {
+			if (gpriv->throttle_count == 0) {
+				wpa_printf(MSG_MSGDUMP,
+					   "CTRL: Had to throttle global event message for sock %d",
+					   gpriv->sock);
+			}
+			gpriv->throttle_count++;
+			wpas_ctrl_msg_queue_limit(gpriv->throttle_count,
+						  &gpriv->msg_queue);
+			wpas_ctrl_msg_queue(&gpriv->msg_queue, wpa_s, level,
+					    type, txt, len);
+		} else {
+			if (gpriv->throttle_count) {
+				wpa_printf(MSG_MSGDUMP,
+					   "CTRL: Had to throttle %u global event message(s) for sock %d",
+					   gpriv->throttle_count, gpriv->sock);
+			}
+			gpriv->throttle_count = 0;
 			wpa_supplicant_ctrl_iface_send(
 				wpa_s,
 				type != WPA_MSG_PER_INTERFACE ?
 				NULL : wpa_s->ifname,
-				priv->sock, &priv->ctrl_dst, level, txt, len,
-				NULL, priv);
+				gpriv->sock, &gpriv->ctrl_dst, level,
+				txt, len, NULL, gpriv);
 		}
 	}
 
-	if (type == WPA_MSG_ONLY_GLOBAL || wpa_s->ctrl_iface == NULL)
-		return;
-	wpa_supplicant_ctrl_iface_send(wpa_s, NULL, wpa_s->ctrl_iface->sock,
-				       &wpa_s->ctrl_iface->ctrl_dst,
-				       level, txt, len, wpa_s->ctrl_iface,
-				       NULL);
+	priv = wpa_s->ctrl_iface;
+
+	if (type != WPA_MSG_ONLY_GLOBAL && priv) {
+		if (!dl_list_empty(&priv->msg_queue) ||
+		    wpas_ctrl_iface_throttle(priv->sock)) {
+			if (priv->throttle_count == 0) {
+				wpa_printf(MSG_MSGDUMP,
+					   "CTRL: Had to throttle event message for sock %d",
+					   priv->sock);
+			}
+			priv->throttle_count++;
+			wpas_ctrl_msg_queue_limit(priv->throttle_count,
+						  &priv->msg_queue);
+			wpas_ctrl_msg_queue(&priv->msg_queue, wpa_s, level,
+					    type, txt, len);
+		} else {
+			if (priv->throttle_count) {
+				wpa_printf(MSG_MSGDUMP,
+					   "CTRL: Had to throttle %u event message(s) for sock %d",
+					   priv->throttle_count, priv->sock);
+			}
+			priv->throttle_count = 0;
+			wpa_supplicant_ctrl_iface_send(wpa_s, NULL, priv->sock,
+						       &priv->ctrl_dst, level,
+						       txt, len, priv, NULL);
+		}
+	}
 }
 
 
@@ -578,6 +767,7 @@
 	if (priv == NULL)
 		return NULL;
 	dl_list_init(&priv->ctrl_dst);
+	dl_list_init(&priv->msg_queue);
 	priv->wpa_s = wpa_s;
 	priv->sock = -1;
 
@@ -624,6 +814,8 @@
 void wpa_supplicant_ctrl_iface_deinit(struct ctrl_iface_priv *priv)
 {
 	struct wpa_ctrl_dst *dst, *prev;
+	struct ctrl_iface_msg *msg, *prev_msg;
+	struct ctrl_iface_global_priv *gpriv;
 
 	if (priv->sock > -1) {
 		char *fname;
@@ -679,6 +871,22 @@
 	dl_list_for_each_safe(dst, prev, &priv->ctrl_dst, struct wpa_ctrl_dst,
 			      list)
 		os_free(dst);
+	dl_list_for_each_safe(msg, prev_msg, &priv->msg_queue,
+			      struct ctrl_iface_msg, list) {
+		dl_list_del(&msg->list);
+		os_free(msg);
+	}
+	gpriv = priv->wpa_s->global->ctrl_iface;
+	if (gpriv) {
+		dl_list_for_each_safe(msg, prev_msg, &gpriv->msg_queue,
+				      struct ctrl_iface_msg, list) {
+			if (msg->wpa_s == priv->wpa_s) {
+				dl_list_del(&msg->list);
+				os_free(msg);
+			}
+		}
+	}
+	eloop_cancel_timeout(wpas_ctrl_msg_queue_timeout, priv->wpa_s, NULL);
 	os_free(priv);
 }
 
@@ -1107,6 +1315,7 @@
 	if (priv == NULL)
 		return NULL;
 	dl_list_init(&priv->ctrl_dst);
+	dl_list_init(&priv->msg_queue);
 	priv->global = global;
 	priv->sock = -1;
 
@@ -1156,6 +1365,7 @@
 wpa_supplicant_global_ctrl_iface_deinit(struct ctrl_iface_global_priv *priv)
 {
 	struct wpa_ctrl_dst *dst, *prev;
+	struct ctrl_iface_msg *msg, *prev_msg;
 
 	if (priv->sock >= 0) {
 		eloop_unregister_read_sock(priv->sock);
@@ -1166,5 +1376,10 @@
 	dl_list_for_each_safe(dst, prev, &priv->ctrl_dst, struct wpa_ctrl_dst,
 			      list)
 		os_free(dst);
+	dl_list_for_each_safe(msg, prev_msg, &priv->msg_queue,
+			      struct ctrl_iface_msg, list) {
+		dl_list_del(&msg->list);
+		os_free(msg);
+	}
 	os_free(priv);
 }