Performance optimization for edit monitor
Instead of log every edit event immediately when received the event,
cache the events and log the cached events in batch periodically. In
case when there are many edits events recieved in a short time (probably
due to non-human operation like repo sync), send an aggregated edit
event instead to prevent performance degrade.
Test: atest edit_monitor_test
Bug: 365617369
Change-Id: Ibe1613cf1e2eb37ebc5dfa5c029b990854fcf91e
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py
index 892c292..4ff4ec8 100644
--- a/tools/edit_monitor/daemon_manager.py
+++ b/tools/edit_monitor/daemon_manager.py
@@ -133,8 +133,12 @@
logging.debug("in daemon manager cleanup.")
try:
- if self.daemon_process and self.daemon_process.is_alive():
- self._terminate_process(self.daemon_process.pid)
+ if self.daemon_process:
+ # The daemon process might already in termination process,
+ # wait some time before kill it explicitly.
+ self._wait_for_process_terminate(self.daemon_process.pid, 1)
+ if self.daemon_process.is_alive():
+ self._terminate_process(self.daemon_process.pid)
self._remove_pidfile()
logging.debug("Successfully stopped daemon manager.")
except Exception as e:
@@ -227,6 +231,7 @@
p = multiprocessing.Process(
target=self.daemon_target, args=self.daemon_args
)
+ p.daemon = True
p.start()
logging.info("Start subprocess with PID %d", p.pid)
diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py
index defc841..31115d4 100644
--- a/tools/edit_monitor/edit_monitor.py
+++ b/tools/edit_monitor/edit_monitor.py
@@ -19,6 +19,7 @@
import os
import pathlib
import platform
+import threading
import time
from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
+DEFAULT_FLUSH_INTERVAL_SECONDS = 5
+DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
class ClearcutEventHandler(PatternMatchingEventHandler):
def __init__(
- self, path: str, cclient: clearcut_client.Clearcut | None = None
+ self,
+ path: str,
+ flush_interval_sec: int,
+ single_events_size_threshold: int,
+ cclient: clearcut_client.Clearcut | None = None,
):
super().__init__(patterns=["*"], ignore_directories=True)
self.root_monitoring_path = path
+ self.flush_interval_sec = flush_interval_sec
+ self.single_events_size_threshold = single_events_size_threshold
self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
self.user_name = getpass.getuser()
self.host_name = platform.node()
self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
+ self.pending_events = []
+ self._scheduled_log_thread = None
+ self._pending_events_lock = threading.Lock()
+
def on_moved(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
@@ -61,6 +74,12 @@
def flushall(self):
logging.info("flushing all pending events.")
+ if self._scheduled_log_thread:
+ logging.info("canceling log thread")
+ self._scheduled_log_thread.cancel()
+ self._scheduled_log_thread = None
+
+ self._log_clearcut_events()
self.cclient.flush_events()
def _log_edit_event(
@@ -92,12 +111,17 @@
file_path=event.src_path, edit_type=edit_type
)
)
- clearcut_log_event = clientanalytics_pb2.LogEvent(
- event_time_ms=int(event_time * 1000),
- source_extension=event_proto.SerializeToString(),
- )
+ with self._pending_events_lock:
+ self.pending_events.append((event_proto, event_time))
+ if not self._scheduled_log_thread:
+ logging.debug(
+ "Scheduling thread to run in %d seconds", self.flush_interval_sec
+ )
+ self._scheduled_log_thread = threading.Timer(
+ self.flush_interval_sec, self._log_clearcut_events
+ )
+ self._scheduled_log_thread.start()
- self.cclient.log(clearcut_log_event)
except Exception:
logging.exception("Failed to log edit event.")
@@ -114,9 +138,46 @@
for dir in file_path.relative_to(root_path).parents
)
+ def _log_clearcut_events(self):
+ with self._pending_events_lock:
+ self._scheduled_log_thread = None
+ edit_events = self.pending_events
+ self.pending_events = []
+
+ pending_events_size = len(edit_events)
+ if pending_events_size > self.single_events_size_threshold:
+ logging.info(
+ "got %d events in %d seconds, sending aggregated events instead",
+ pending_events_size,
+ self.flush_interval_sec,
+ )
+ aggregated_event_time = edit_events[0][1]
+ aggregated_event_proto = edit_event_pb2.EditEvent(
+ user_name=self.user_name,
+ host_name=self.host_name,
+ source_root=self.source_root,
+ )
+ aggregated_event_proto.aggregated_edit_event.CopyFrom(
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=pending_events_size
+ )
+ )
+ edit_events = [(aggregated_event_proto, aggregated_event_time)]
+
+ for event_proto, event_time in edit_events:
+ log_event = clientanalytics_pb2.LogEvent(
+ event_time_ms=int(event_time * 1000),
+ source_extension=event_proto.SerializeToString(),
+ )
+ self.cclient.log(log_event)
+
+ logging.info("sent %d edit events", len(edit_events))
+
def start(
path: str,
+ flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
+ single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
cclient: clearcut_client.Clearcut | None = None,
pipe_sender: multiprocessing.connection.Connection | None = None,
):
@@ -130,7 +191,8 @@
cclient: The clearcut client to send the edit logs.
conn: the sender of the pipe to communicate with the deamon manager.
"""
- event_handler = ClearcutEventHandler(path, cclient)
+ event_handler = ClearcutEventHandler(
+ path, flush_interval_sec, single_events_size_threshold, cclient)
observer = Observer()
logging.info("Starting observer on path %s.", path)
diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py
index 5bc1b13..4ec3284 100644
--- a/tools/edit_monitor/edit_monitor_test.py
+++ b/tools/edit_monitor/edit_monitor_test.py
@@ -53,7 +53,7 @@
self.working_dir.cleanup()
super().tearDown()
- def test_log_edit_event_success(self):
+ def test_log_single_edit_event_success(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
fake_cclient = FakeClearcutClient(
@@ -127,6 +127,42 @@
).single_edit_event,
)
+
+ def test_log_aggregated_edit_event_success(self):
+ # Create the .git file under the monitoring dir.
+ self.root_monitoring_path.joinpath('.git').touch()
+ fake_cclient = FakeClearcutClient(
+ log_output_file=self.log_event_dir.joinpath('logs.output')
+ )
+ p = self._start_test_edit_monitor_process(fake_cclient)
+
+ # Create 6 test files
+ for i in range(6):
+ test_file = self.root_monitoring_path.joinpath('test_' + str(i))
+ test_file.touch()
+
+ # Give some time for the edit monitor to receive the edit event.
+ time.sleep(1)
+ # Stop the edit monitor and flush all events.
+ os.kill(p.pid, signal.SIGINT)
+ p.join()
+
+ logged_events = self._get_logged_events()
+ self.assertEqual(len(logged_events), 1)
+
+ expected_aggregated_edit_event = (
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=6,
+ )
+ )
+
+ self.assertEqual(
+ expected_aggregated_edit_event,
+ edit_event_pb2.EditEvent.FromString(
+ logged_events[0].source_extension
+ ).aggregated_edit_event,
+ )
+
def test_do_not_log_edit_event_for_directory_change(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
@@ -217,7 +253,7 @@
# Start edit monitor in a subprocess.
p = multiprocessing.Process(
target=edit_monitor.start,
- args=(str(self.root_monitoring_path.resolve()), cclient, sender),
+ args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender),
)
p.daemon = True
p.start()