Performance optimization for edit monitor
Instead of log every edit event immediately when received the event,
cache the events and log the cached events in batch periodically. In
case when there are many edits events recieved in a short time (probably
due to non-human operation like repo sync), send an aggregated edit
event instead to prevent performance degrade.
Test: atest edit_monitor_test
Bug: 365617369
Change-Id: Ibe1613cf1e2eb37ebc5dfa5c029b990854fcf91e
diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py
index defc841..31115d4 100644
--- a/tools/edit_monitor/edit_monitor.py
+++ b/tools/edit_monitor/edit_monitor.py
@@ -19,6 +19,7 @@
import os
import pathlib
import platform
+import threading
import time
from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
+DEFAULT_FLUSH_INTERVAL_SECONDS = 5
+DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
class ClearcutEventHandler(PatternMatchingEventHandler):
def __init__(
- self, path: str, cclient: clearcut_client.Clearcut | None = None
+ self,
+ path: str,
+ flush_interval_sec: int,
+ single_events_size_threshold: int,
+ cclient: clearcut_client.Clearcut | None = None,
):
super().__init__(patterns=["*"], ignore_directories=True)
self.root_monitoring_path = path
+ self.flush_interval_sec = flush_interval_sec
+ self.single_events_size_threshold = single_events_size_threshold
self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
self.user_name = getpass.getuser()
self.host_name = platform.node()
self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
+ self.pending_events = []
+ self._scheduled_log_thread = None
+ self._pending_events_lock = threading.Lock()
+
def on_moved(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
@@ -61,6 +74,12 @@
def flushall(self):
logging.info("flushing all pending events.")
+ if self._scheduled_log_thread:
+ logging.info("canceling log thread")
+ self._scheduled_log_thread.cancel()
+ self._scheduled_log_thread = None
+
+ self._log_clearcut_events()
self.cclient.flush_events()
def _log_edit_event(
@@ -92,12 +111,17 @@
file_path=event.src_path, edit_type=edit_type
)
)
- clearcut_log_event = clientanalytics_pb2.LogEvent(
- event_time_ms=int(event_time * 1000),
- source_extension=event_proto.SerializeToString(),
- )
+ with self._pending_events_lock:
+ self.pending_events.append((event_proto, event_time))
+ if not self._scheduled_log_thread:
+ logging.debug(
+ "Scheduling thread to run in %d seconds", self.flush_interval_sec
+ )
+ self._scheduled_log_thread = threading.Timer(
+ self.flush_interval_sec, self._log_clearcut_events
+ )
+ self._scheduled_log_thread.start()
- self.cclient.log(clearcut_log_event)
except Exception:
logging.exception("Failed to log edit event.")
@@ -114,9 +138,46 @@
for dir in file_path.relative_to(root_path).parents
)
+ def _log_clearcut_events(self):
+ with self._pending_events_lock:
+ self._scheduled_log_thread = None
+ edit_events = self.pending_events
+ self.pending_events = []
+
+ pending_events_size = len(edit_events)
+ if pending_events_size > self.single_events_size_threshold:
+ logging.info(
+ "got %d events in %d seconds, sending aggregated events instead",
+ pending_events_size,
+ self.flush_interval_sec,
+ )
+ aggregated_event_time = edit_events[0][1]
+ aggregated_event_proto = edit_event_pb2.EditEvent(
+ user_name=self.user_name,
+ host_name=self.host_name,
+ source_root=self.source_root,
+ )
+ aggregated_event_proto.aggregated_edit_event.CopyFrom(
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=pending_events_size
+ )
+ )
+ edit_events = [(aggregated_event_proto, aggregated_event_time)]
+
+ for event_proto, event_time in edit_events:
+ log_event = clientanalytics_pb2.LogEvent(
+ event_time_ms=int(event_time * 1000),
+ source_extension=event_proto.SerializeToString(),
+ )
+ self.cclient.log(log_event)
+
+ logging.info("sent %d edit events", len(edit_events))
+
def start(
path: str,
+ flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
+ single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
cclient: clearcut_client.Clearcut | None = None,
pipe_sender: multiprocessing.connection.Connection | None = None,
):
@@ -130,7 +191,8 @@
cclient: The clearcut client to send the edit logs.
conn: the sender of the pipe to communicate with the deamon manager.
"""
- event_handler = ClearcutEventHandler(path, cclient)
+ event_handler = ClearcutEventHandler(
+ path, flush_interval_sec, single_events_size_threshold, cclient)
observer = Observer()
logging.info("Starting observer on path %s.", path)