Merge "Performance optimization for edit monitor" into main
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py
index 892c292..4ff4ec8 100644
--- a/tools/edit_monitor/daemon_manager.py
+++ b/tools/edit_monitor/daemon_manager.py
@@ -133,8 +133,12 @@
logging.debug("in daemon manager cleanup.")
try:
- if self.daemon_process and self.daemon_process.is_alive():
- self._terminate_process(self.daemon_process.pid)
+ if self.daemon_process:
+ # The daemon process might already in termination process,
+ # wait some time before kill it explicitly.
+ self._wait_for_process_terminate(self.daemon_process.pid, 1)
+ if self.daemon_process.is_alive():
+ self._terminate_process(self.daemon_process.pid)
self._remove_pidfile()
logging.debug("Successfully stopped daemon manager.")
except Exception as e:
@@ -227,6 +231,7 @@
p = multiprocessing.Process(
target=self.daemon_target, args=self.daemon_args
)
+ p.daemon = True
p.start()
logging.info("Start subprocess with PID %d", p.pid)
diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py
index defc841..31115d4 100644
--- a/tools/edit_monitor/edit_monitor.py
+++ b/tools/edit_monitor/edit_monitor.py
@@ -19,6 +19,7 @@
import os
import pathlib
import platform
+import threading
import time
from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
+DEFAULT_FLUSH_INTERVAL_SECONDS = 5
+DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
class ClearcutEventHandler(PatternMatchingEventHandler):
def __init__(
- self, path: str, cclient: clearcut_client.Clearcut | None = None
+ self,
+ path: str,
+ flush_interval_sec: int,
+ single_events_size_threshold: int,
+ cclient: clearcut_client.Clearcut | None = None,
):
super().__init__(patterns=["*"], ignore_directories=True)
self.root_monitoring_path = path
+ self.flush_interval_sec = flush_interval_sec
+ self.single_events_size_threshold = single_events_size_threshold
self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
self.user_name = getpass.getuser()
self.host_name = platform.node()
self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
+ self.pending_events = []
+ self._scheduled_log_thread = None
+ self._pending_events_lock = threading.Lock()
+
def on_moved(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
@@ -61,6 +74,12 @@
def flushall(self):
logging.info("flushing all pending events.")
+ if self._scheduled_log_thread:
+ logging.info("canceling log thread")
+ self._scheduled_log_thread.cancel()
+ self._scheduled_log_thread = None
+
+ self._log_clearcut_events()
self.cclient.flush_events()
def _log_edit_event(
@@ -92,12 +111,17 @@
file_path=event.src_path, edit_type=edit_type
)
)
- clearcut_log_event = clientanalytics_pb2.LogEvent(
- event_time_ms=int(event_time * 1000),
- source_extension=event_proto.SerializeToString(),
- )
+ with self._pending_events_lock:
+ self.pending_events.append((event_proto, event_time))
+ if not self._scheduled_log_thread:
+ logging.debug(
+ "Scheduling thread to run in %d seconds", self.flush_interval_sec
+ )
+ self._scheduled_log_thread = threading.Timer(
+ self.flush_interval_sec, self._log_clearcut_events
+ )
+ self._scheduled_log_thread.start()
- self.cclient.log(clearcut_log_event)
except Exception:
logging.exception("Failed to log edit event.")
@@ -114,9 +138,46 @@
for dir in file_path.relative_to(root_path).parents
)
+ def _log_clearcut_events(self):
+ with self._pending_events_lock:
+ self._scheduled_log_thread = None
+ edit_events = self.pending_events
+ self.pending_events = []
+
+ pending_events_size = len(edit_events)
+ if pending_events_size > self.single_events_size_threshold:
+ logging.info(
+ "got %d events in %d seconds, sending aggregated events instead",
+ pending_events_size,
+ self.flush_interval_sec,
+ )
+ aggregated_event_time = edit_events[0][1]
+ aggregated_event_proto = edit_event_pb2.EditEvent(
+ user_name=self.user_name,
+ host_name=self.host_name,
+ source_root=self.source_root,
+ )
+ aggregated_event_proto.aggregated_edit_event.CopyFrom(
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=pending_events_size
+ )
+ )
+ edit_events = [(aggregated_event_proto, aggregated_event_time)]
+
+ for event_proto, event_time in edit_events:
+ log_event = clientanalytics_pb2.LogEvent(
+ event_time_ms=int(event_time * 1000),
+ source_extension=event_proto.SerializeToString(),
+ )
+ self.cclient.log(log_event)
+
+ logging.info("sent %d edit events", len(edit_events))
+
def start(
path: str,
+ flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
+ single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
cclient: clearcut_client.Clearcut | None = None,
pipe_sender: multiprocessing.connection.Connection | None = None,
):
@@ -130,7 +191,8 @@
cclient: The clearcut client to send the edit logs.
conn: the sender of the pipe to communicate with the deamon manager.
"""
- event_handler = ClearcutEventHandler(path, cclient)
+ event_handler = ClearcutEventHandler(
+ path, flush_interval_sec, single_events_size_threshold, cclient)
observer = Observer()
logging.info("Starting observer on path %s.", path)
diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py
index 5bc1b13..4ec3284 100644
--- a/tools/edit_monitor/edit_monitor_test.py
+++ b/tools/edit_monitor/edit_monitor_test.py
@@ -53,7 +53,7 @@
self.working_dir.cleanup()
super().tearDown()
- def test_log_edit_event_success(self):
+ def test_log_single_edit_event_success(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
fake_cclient = FakeClearcutClient(
@@ -127,6 +127,42 @@
).single_edit_event,
)
+
+ def test_log_aggregated_edit_event_success(self):
+ # Create the .git file under the monitoring dir.
+ self.root_monitoring_path.joinpath('.git').touch()
+ fake_cclient = FakeClearcutClient(
+ log_output_file=self.log_event_dir.joinpath('logs.output')
+ )
+ p = self._start_test_edit_monitor_process(fake_cclient)
+
+ # Create 6 test files
+ for i in range(6):
+ test_file = self.root_monitoring_path.joinpath('test_' + str(i))
+ test_file.touch()
+
+ # Give some time for the edit monitor to receive the edit event.
+ time.sleep(1)
+ # Stop the edit monitor and flush all events.
+ os.kill(p.pid, signal.SIGINT)
+ p.join()
+
+ logged_events = self._get_logged_events()
+ self.assertEqual(len(logged_events), 1)
+
+ expected_aggregated_edit_event = (
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=6,
+ )
+ )
+
+ self.assertEqual(
+ expected_aggregated_edit_event,
+ edit_event_pb2.EditEvent.FromString(
+ logged_events[0].source_extension
+ ).aggregated_edit_event,
+ )
+
def test_do_not_log_edit_event_for_directory_change(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
@@ -217,7 +253,7 @@
# Start edit monitor in a subprocess.
p = multiprocessing.Process(
target=edit_monitor.start,
- args=(str(self.root_monitoring_path.resolve()), cclient, sender),
+ args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender),
)
p.daemon = True
p.start()