blob: 31115d4cb4963ac4c31bb57623d02f62b9fda763 [file] [log] [blame]
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +00001# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import getpass
17import logging
18import multiprocessing.connection
19import os
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +000020import pathlib
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000021import platform
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000022import threading
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000023import time
24
25from atest.metrics import clearcut_client
26from atest.proto import clientanalytics_pb2
27from proto import edit_event_pb2
28from watchdog.events import FileSystemEvent
29from watchdog.events import PatternMatchingEventHandler
30from watchdog.observers import Observer
31
32# Enum of the Clearcut log source defined under
33# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
34LOG_SOURCE = 2524
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000035DEFAULT_FLUSH_INTERVAL_SECONDS = 5
36DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000037
38
39class ClearcutEventHandler(PatternMatchingEventHandler):
40
41 def __init__(
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000042 self,
43 path: str,
44 flush_interval_sec: int,
45 single_events_size_threshold: int,
46 cclient: clearcut_client.Clearcut | None = None,
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000047 ):
48
49 super().__init__(patterns=["*"], ignore_directories=True)
50 self.root_monitoring_path = path
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000051 self.flush_interval_sec = flush_interval_sec
52 self.single_events_size_threshold = single_events_size_threshold
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000053 self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
54
55 self.user_name = getpass.getuser()
56 self.host_name = platform.node()
57 self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
58
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000059 self.pending_events = []
60 self._scheduled_log_thread = None
61 self._pending_events_lock = threading.Lock()
62
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000063 def on_moved(self, event: FileSystemEvent):
64 self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
65
66 def on_created(self, event: FileSystemEvent):
67 self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE)
68
69 def on_deleted(self, event: FileSystemEvent):
70 self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE)
71
72 def on_modified(self, event: FileSystemEvent):
73 self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY)
74
75 def flushall(self):
76 logging.info("flushing all pending events.")
Zhuoyao Zhang8a225792024-10-09 18:04:39 +000077 if self._scheduled_log_thread:
78 logging.info("canceling log thread")
79 self._scheduled_log_thread.cancel()
80 self._scheduled_log_thread = None
81
82 self._log_clearcut_events()
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000083 self.cclient.flush_events()
84
85 def _log_edit_event(
86 self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType
87 ):
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +000088 try:
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +000089 event_time = time.time()
90
91 if self._is_hidden_file(pathlib.Path(event.src_path)):
92 logging.debug("ignore hidden file: %s.", event.src_path)
93 return
94
95 if not self._is_under_git_project(pathlib.Path(event.src_path)):
96 logging.debug(
97 "ignore file %s which does not belong to a git project",
98 event.src_path,
99 )
100 return
101
102 logging.info("%s: %s", event.event_type, event.src_path)
103
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000104 event_proto = edit_event_pb2.EditEvent(
105 user_name=self.user_name,
106 host_name=self.host_name,
107 source_root=self.source_root,
108 )
109 event_proto.single_edit_event.CopyFrom(
110 edit_event_pb2.EditEvent.SingleEditEvent(
111 file_path=event.src_path, edit_type=edit_type
112 )
113 )
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000114 with self._pending_events_lock:
115 self.pending_events.append((event_proto, event_time))
116 if not self._scheduled_log_thread:
117 logging.debug(
118 "Scheduling thread to run in %d seconds", self.flush_interval_sec
119 )
120 self._scheduled_log_thread = threading.Timer(
121 self.flush_interval_sec, self._log_clearcut_events
122 )
123 self._scheduled_log_thread.start()
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000124
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000125 except Exception:
126 logging.exception("Failed to log edit event.")
127
Zhuoyao Zhang35bd3d22024-10-01 00:16:49 +0000128 def _is_hidden_file(self, file_path: pathlib.Path) -> bool:
129 return any(
130 part.startswith(".")
131 for part in file_path.relative_to(self.root_monitoring_path).parts
132 )
133
134 def _is_under_git_project(self, file_path: pathlib.Path) -> bool:
135 root_path = pathlib.Path(self.root_monitoring_path).resolve()
136 return any(
137 root_path.joinpath(dir).joinpath('.git').exists()
138 for dir in file_path.relative_to(root_path).parents
139 )
140
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000141 def _log_clearcut_events(self):
142 with self._pending_events_lock:
143 self._scheduled_log_thread = None
144 edit_events = self.pending_events
145 self.pending_events = []
146
147 pending_events_size = len(edit_events)
148 if pending_events_size > self.single_events_size_threshold:
149 logging.info(
150 "got %d events in %d seconds, sending aggregated events instead",
151 pending_events_size,
152 self.flush_interval_sec,
153 )
154 aggregated_event_time = edit_events[0][1]
155 aggregated_event_proto = edit_event_pb2.EditEvent(
156 user_name=self.user_name,
157 host_name=self.host_name,
158 source_root=self.source_root,
159 )
160 aggregated_event_proto.aggregated_edit_event.CopyFrom(
161 edit_event_pb2.EditEvent.AggregatedEditEvent(
162 num_edits=pending_events_size
163 )
164 )
165 edit_events = [(aggregated_event_proto, aggregated_event_time)]
166
167 for event_proto, event_time in edit_events:
168 log_event = clientanalytics_pb2.LogEvent(
169 event_time_ms=int(event_time * 1000),
170 source_extension=event_proto.SerializeToString(),
171 )
172 self.cclient.log(log_event)
173
174 logging.info("sent %d edit events", len(edit_events))
175
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000176
177def start(
178 path: str,
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000179 flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
180 single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000181 cclient: clearcut_client.Clearcut | None = None,
182 pipe_sender: multiprocessing.connection.Connection | None = None,
183):
184 """Method to start the edit monitor.
185
186 This is the entry point to start the edit monitor as a subprocess of
187 the daemon manager.
188
189 params:
190 path: The root path to monitor
191 cclient: The clearcut client to send the edit logs.
192 conn: the sender of the pipe to communicate with the deamon manager.
193 """
Zhuoyao Zhang8a225792024-10-09 18:04:39 +0000194 event_handler = ClearcutEventHandler(
195 path, flush_interval_sec, single_events_size_threshold, cclient)
Zhuoyao Zhangbae5f722024-09-20 16:53:59 +0000196 observer = Observer()
197
198 logging.info("Starting observer on path %s.", path)
199 observer.schedule(event_handler, path, recursive=True)
200 observer.start()
201 logging.info("Observer started.")
202 if pipe_sender:
203 pipe_sender.send("Observer started.")
204
205 try:
206 while True:
207 time.sleep(1)
208 finally:
209 event_handler.flushall()
210 observer.stop()
211 observer.join()
212 if pipe_sender:
213 pipe_sender.close()