blob: ab528e870fb52fef1018ae5fea3126782af3cac0 [file] [log] [blame]
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +00001# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import getpass
17import logging
18import multiprocessing.connection
19import os
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +000020import pathlib
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000021import platform
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000022import threading
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000023import time
24
25from atest.metrics import clearcut_client
26from atest.proto import clientanalytics_pb2
27from proto import edit_event_pb2
28from watchdog.events import FileSystemEvent
29from watchdog.events import PatternMatchingEventHandler
30from watchdog.observers import Observer
31
32# Enum of the Clearcut log source defined under
33# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
34LOG_SOURCE = 2524
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000035DEFAULT_FLUSH_INTERVAL_SECONDS = 5
36DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000037
38
39class ClearcutEventHandler(PatternMatchingEventHandler):
40
41 def __init__(
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000042 self,
43 path: str,
44 flush_interval_sec: int,
45 single_events_size_threshold: int,
Zhuoyao Zhang7f22db82024-10-17 17:40:34 +000046 is_dry_run: bool = False,
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000047 cclient: clearcut_client.Clearcut | None = None,
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000048 ):
49
50 super().__init__(patterns=["*"], ignore_directories=True)
51 self.root_monitoring_path = path
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000052 self.flush_interval_sec = flush_interval_sec
53 self.single_events_size_threshold = single_events_size_threshold
Zhuoyao Zhang7f22db82024-10-17 17:40:34 +000054 self.is_dry_run = is_dry_run
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000055 self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
56
57 self.user_name = getpass.getuser()
58 self.host_name = platform.node()
59 self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
60
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000061 self.pending_events = []
62 self._scheduled_log_thread = None
63 self._pending_events_lock = threading.Lock()
64
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000065 def on_moved(self, event: FileSystemEvent):
66 self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
67
68 def on_created(self, event: FileSystemEvent):
69 self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE)
70
71 def on_deleted(self, event: FileSystemEvent):
72 self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE)
73
74 def on_modified(self, event: FileSystemEvent):
75 self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY)
76
77 def flushall(self):
78 logging.info("flushing all pending events.")
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000079 if self._scheduled_log_thread:
80 logging.info("canceling log thread")
81 self._scheduled_log_thread.cancel()
82 self._scheduled_log_thread = None
83
84 self._log_clearcut_events()
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000085 self.cclient.flush_events()
86
87 def _log_edit_event(
88 self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType
89 ):
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000090 try:
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +000091 event_time = time.time()
92
93 if self._is_hidden_file(pathlib.Path(event.src_path)):
94 logging.debug("ignore hidden file: %s.", event.src_path)
95 return
96
97 if not self._is_under_git_project(pathlib.Path(event.src_path)):
98 logging.debug(
99 "ignore file %s which does not belong to a git project",
100 event.src_path,
101 )
102 return
103
104 logging.info("%s: %s", event.event_type, event.src_path)
105
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000106 event_proto = edit_event_pb2.EditEvent(
107 user_name=self.user_name,
108 host_name=self.host_name,
109 source_root=self.source_root,
110 )
111 event_proto.single_edit_event.CopyFrom(
112 edit_event_pb2.EditEvent.SingleEditEvent(
113 file_path=event.src_path, edit_type=edit_type
114 )
115 )
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000116 with self._pending_events_lock:
117 self.pending_events.append((event_proto, event_time))
118 if not self._scheduled_log_thread:
119 logging.debug(
120 "Scheduling thread to run in %d seconds", self.flush_interval_sec
121 )
122 self._scheduled_log_thread = threading.Timer(
123 self.flush_interval_sec, self._log_clearcut_events
124 )
125 self._scheduled_log_thread.start()
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000126
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000127 except Exception:
128 logging.exception("Failed to log edit event.")
129
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +0000130 def _is_hidden_file(self, file_path: pathlib.Path) -> bool:
131 return any(
132 part.startswith(".")
133 for part in file_path.relative_to(self.root_monitoring_path).parts
134 )
135
136 def _is_under_git_project(self, file_path: pathlib.Path) -> bool:
137 root_path = pathlib.Path(self.root_monitoring_path).resolve()
138 return any(
139 root_path.joinpath(dir).joinpath('.git').exists()
140 for dir in file_path.relative_to(root_path).parents
141 )
142
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000143 def _log_clearcut_events(self):
144 with self._pending_events_lock:
145 self._scheduled_log_thread = None
146 edit_events = self.pending_events
147 self.pending_events = []
148
149 pending_events_size = len(edit_events)
150 if pending_events_size > self.single_events_size_threshold:
151 logging.info(
152 "got %d events in %d seconds, sending aggregated events instead",
153 pending_events_size,
154 self.flush_interval_sec,
155 )
156 aggregated_event_time = edit_events[0][1]
157 aggregated_event_proto = edit_event_pb2.EditEvent(
158 user_name=self.user_name,
159 host_name=self.host_name,
160 source_root=self.source_root,
161 )
162 aggregated_event_proto.aggregated_edit_event.CopyFrom(
163 edit_event_pb2.EditEvent.AggregatedEditEvent(
164 num_edits=pending_events_size
165 )
166 )
167 edit_events = [(aggregated_event_proto, aggregated_event_time)]
168
Zhuoyao Zhang7f22db82024-10-17 17:40:34 +0000169 if self.is_dry_run:
170 logging.info("Sent %d edit events in dry run.", len(edit_events))
171 return
172
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000173 for event_proto, event_time in edit_events:
174 log_event = clientanalytics_pb2.LogEvent(
175 event_time_ms=int(event_time * 1000),
176 source_extension=event_proto.SerializeToString(),
177 )
178 self.cclient.log(log_event)
179
180 logging.info("sent %d edit events", len(edit_events))
181
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000182
183def start(
184 path: str,
Zhuoyao Zhang7f22db82024-10-17 17:40:34 +0000185 is_dry_run: bool = False,
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000186 flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
187 single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000188 cclient: clearcut_client.Clearcut | None = None,
189 pipe_sender: multiprocessing.connection.Connection | None = None,
190):
191 """Method to start the edit monitor.
192
193 This is the entry point to start the edit monitor as a subprocess of
194 the daemon manager.
195
196 params:
197 path: The root path to monitor
198 cclient: The clearcut client to send the edit logs.
199 conn: the sender of the pipe to communicate with the deamon manager.
200 """
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000201 event_handler = ClearcutEventHandler(
Zhuoyao Zhang7f22db82024-10-17 17:40:34 +0000202 path, flush_interval_sec, single_events_size_threshold, is_dry_run, cclient)
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000203 observer = Observer()
204
205 logging.info("Starting observer on path %s.", path)
206 observer.schedule(event_handler, path, recursive=True)
207 observer.start()
208 logging.info("Observer started.")
209 if pipe_sender:
210 pipe_sender.send("Observer started.")
211
212 try:
213 while True:
214 time.sleep(1)
215 finally:
216 event_handler.flushall()
217 observer.stop()
218 observer.join()
219 if pipe_sender:
220 pipe_sender.close()