Merge "Performance optimization for edit monitor" into main
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py
index 892c292..4ff4ec8 100644
--- a/tools/edit_monitor/daemon_manager.py
+++ b/tools/edit_monitor/daemon_manager.py
@@ -133,8 +133,12 @@
 
     logging.debug("in daemon manager cleanup.")
     try:
-      if self.daemon_process and self.daemon_process.is_alive():
-        self._terminate_process(self.daemon_process.pid)
+      if self.daemon_process:
+        # The daemon process might already in termination process,
+        # wait some time before kill it explicitly.
+        self._wait_for_process_terminate(self.daemon_process.pid, 1)
+        if self.daemon_process.is_alive():
+          self._terminate_process(self.daemon_process.pid)
       self._remove_pidfile()
       logging.debug("Successfully stopped daemon manager.")
     except Exception as e:
@@ -227,6 +231,7 @@
     p = multiprocessing.Process(
         target=self.daemon_target, args=self.daemon_args
     )
+    p.daemon = True
     p.start()
 
     logging.info("Start subprocess with PID %d", p.pid)
diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py
index defc841..31115d4 100644
--- a/tools/edit_monitor/edit_monitor.py
+++ b/tools/edit_monitor/edit_monitor.py
@@ -19,6 +19,7 @@
 import os
 import pathlib
 import platform
+import threading
 import time
 
 from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@
 # Enum of the Clearcut log source defined under
 # /google3/wireless/android/play/playlog/proto/log_source_enum.proto
 LOG_SOURCE = 2524
+DEFAULT_FLUSH_INTERVAL_SECONDS = 5
+DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
 
 
 class ClearcutEventHandler(PatternMatchingEventHandler):
 
   def __init__(
-      self, path: str, cclient: clearcut_client.Clearcut | None = None
+      self,
+      path: str,
+      flush_interval_sec: int,
+      single_events_size_threshold: int,
+      cclient: clearcut_client.Clearcut | None = None,
   ):
 
     super().__init__(patterns=["*"], ignore_directories=True)
     self.root_monitoring_path = path
+    self.flush_interval_sec = flush_interval_sec
+    self.single_events_size_threshold = single_events_size_threshold
     self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
 
     self.user_name = getpass.getuser()
     self.host_name = platform.node()
     self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
 
+    self.pending_events = []
+    self._scheduled_log_thread = None
+    self._pending_events_lock = threading.Lock()
+
   def on_moved(self, event: FileSystemEvent):
     self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
 
@@ -61,6 +74,12 @@
 
   def flushall(self):
     logging.info("flushing all pending events.")
+    if self._scheduled_log_thread:
+      logging.info("canceling log thread")
+      self._scheduled_log_thread.cancel()
+      self._scheduled_log_thread = None
+
+    self._log_clearcut_events()
     self.cclient.flush_events()
 
   def _log_edit_event(
@@ -92,12 +111,17 @@
               file_path=event.src_path, edit_type=edit_type
           )
       )
-      clearcut_log_event = clientanalytics_pb2.LogEvent(
-          event_time_ms=int(event_time * 1000),
-          source_extension=event_proto.SerializeToString(),
-      )
+      with self._pending_events_lock:
+        self.pending_events.append((event_proto, event_time))
+        if not self._scheduled_log_thread:
+          logging.debug(
+              "Scheduling thread to run in %d seconds", self.flush_interval_sec
+          )
+          self._scheduled_log_thread = threading.Timer(
+              self.flush_interval_sec, self._log_clearcut_events
+          )
+          self._scheduled_log_thread.start()
 
-      self.cclient.log(clearcut_log_event)
     except Exception:
       logging.exception("Failed to log edit event.")
 
@@ -114,9 +138,46 @@
         for dir in file_path.relative_to(root_path).parents
     )
 
+  def _log_clearcut_events(self):
+    with self._pending_events_lock:
+      self._scheduled_log_thread = None
+      edit_events = self.pending_events
+      self.pending_events = []
+
+    pending_events_size = len(edit_events)
+    if pending_events_size > self.single_events_size_threshold:
+      logging.info(
+          "got %d events in %d seconds, sending aggregated events instead",
+          pending_events_size,
+          self.flush_interval_sec,
+      )
+      aggregated_event_time = edit_events[0][1]
+      aggregated_event_proto = edit_event_pb2.EditEvent(
+          user_name=self.user_name,
+          host_name=self.host_name,
+          source_root=self.source_root,
+      )
+      aggregated_event_proto.aggregated_edit_event.CopyFrom(
+          edit_event_pb2.EditEvent.AggregatedEditEvent(
+              num_edits=pending_events_size
+          )
+      )
+      edit_events = [(aggregated_event_proto, aggregated_event_time)]
+
+    for event_proto, event_time in edit_events:
+      log_event = clientanalytics_pb2.LogEvent(
+          event_time_ms=int(event_time * 1000),
+          source_extension=event_proto.SerializeToString(),
+      )
+      self.cclient.log(log_event)
+
+    logging.info("sent %d edit events", len(edit_events))
+
 
 def start(
     path: str,
+    flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
+    single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
     cclient: clearcut_client.Clearcut | None = None,
     pipe_sender: multiprocessing.connection.Connection | None = None,
 ):
@@ -130,7 +191,8 @@
     cclient: The clearcut client to send the edit logs.
     conn: the sender of the pipe to communicate with the deamon manager.
   """
-  event_handler = ClearcutEventHandler(path, cclient)
+  event_handler = ClearcutEventHandler(
+      path, flush_interval_sec, single_events_size_threshold, cclient)
   observer = Observer()
 
   logging.info("Starting observer on path %s.", path)
diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py
index 5bc1b13..4ec3284 100644
--- a/tools/edit_monitor/edit_monitor_test.py
+++ b/tools/edit_monitor/edit_monitor_test.py
@@ -53,7 +53,7 @@
     self.working_dir.cleanup()
     super().tearDown()
 
-  def test_log_edit_event_success(self):
+  def test_log_single_edit_event_success(self):
     # Create the .git file under the monitoring dir.
     self.root_monitoring_path.joinpath('.git').touch()
     fake_cclient = FakeClearcutClient(
@@ -127,6 +127,42 @@
         ).single_edit_event,
     )
 
+
+  def test_log_aggregated_edit_event_success(self):
+    # Create the .git file under the monitoring dir.
+    self.root_monitoring_path.joinpath('.git').touch()
+    fake_cclient = FakeClearcutClient(
+        log_output_file=self.log_event_dir.joinpath('logs.output')
+    )
+    p = self._start_test_edit_monitor_process(fake_cclient)
+
+    # Create 6 test files
+    for i in range(6):
+      test_file = self.root_monitoring_path.joinpath('test_' + str(i))
+      test_file.touch()
+
+    # Give some time for the edit monitor to receive the edit event.
+    time.sleep(1)
+    # Stop the edit monitor and flush all events.
+    os.kill(p.pid, signal.SIGINT)
+    p.join()
+
+    logged_events = self._get_logged_events()
+    self.assertEqual(len(logged_events), 1)
+
+    expected_aggregated_edit_event = (
+        edit_event_pb2.EditEvent.AggregatedEditEvent(
+            num_edits=6,
+        )
+    )
+
+    self.assertEqual(
+        expected_aggregated_edit_event,
+        edit_event_pb2.EditEvent.FromString(
+            logged_events[0].source_extension
+        ).aggregated_edit_event,
+    )
+
   def test_do_not_log_edit_event_for_directory_change(self):
     # Create the .git file under the monitoring dir.
     self.root_monitoring_path.joinpath('.git').touch()
@@ -217,7 +253,7 @@
     # Start edit monitor in a subprocess.
     p = multiprocessing.Process(
         target=edit_monitor.start,
-        args=(str(self.root_monitoring_path.resolve()), cclient, sender),
+        args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender),
     )
     p.daemon = True
     p.start()