1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# Copyright 2024, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import getpass
import logging
import multiprocessing.connection
import os
import pathlib
import platform
import threading
import time
from atest.metrics import clearcut_client
from atest.proto import clientanalytics_pb2
from proto import edit_event_pb2
from watchdog.events import FileSystemEvent
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
class ClearcutEventHandler(PatternMatchingEventHandler):
def __init__(
self,
path: str,
flush_interval_sec: int,
single_events_size_threshold: int,
is_dry_run: bool = False,
cclient: clearcut_client.Clearcut | None = None,
):
super().__init__(patterns=["*"], ignore_directories=True)
self.root_monitoring_path = path
self.flush_interval_sec = flush_interval_sec
self.single_events_size_threshold = single_events_size_threshold
self.is_dry_run = is_dry_run
self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
self.user_name = getpass.getuser()
self.host_name = platform.node()
self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
self.pending_events = []
self._scheduled_log_thread = None
self._pending_events_lock = threading.Lock()
def on_moved(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
def on_created(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE)
def on_deleted(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE)
def on_modified(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY)
def flushall(self):
logging.info("flushing all pending events.")
if self._scheduled_log_thread:
logging.info("canceling log thread")
self._scheduled_log_thread.cancel()
self._scheduled_log_thread = None
self._log_clearcut_events()
self.cclient.flush_events()
def _log_edit_event(
self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType
):
try:
event_time = time.time()
if self._is_hidden_file(pathlib.Path(event.src_path)):
logging.debug("ignore hidden file: %s.", event.src_path)
return
if not self._is_under_git_project(pathlib.Path(event.src_path)):
logging.debug(
"ignore file %s which does not belong to a git project",
event.src_path,
)
return
logging.info("%s: %s", event.event_type, event.src_path)
event_proto = edit_event_pb2.EditEvent(
user_name=self.user_name,
host_name=self.host_name,
source_root=self.source_root,
)
event_proto.single_edit_event.CopyFrom(
edit_event_pb2.EditEvent.SingleEditEvent(
file_path=event.src_path, edit_type=edit_type
)
)
with self._pending_events_lock:
self.pending_events.append((event_proto, event_time))
if not self._scheduled_log_thread:
logging.debug(
"Scheduling thread to run in %d seconds", self.flush_interval_sec
)
self._scheduled_log_thread = threading.Timer(
self.flush_interval_sec, self._log_clearcut_events
)
self._scheduled_log_thread.start()
except Exception:
logging.exception("Failed to log edit event.")
def _is_hidden_file(self, file_path: pathlib.Path) -> bool:
return any(
part.startswith(".")
for part in file_path.relative_to(self.root_monitoring_path).parts
)
def _is_under_git_project(self, file_path: pathlib.Path) -> bool:
root_path = pathlib.Path(self.root_monitoring_path).resolve()
return any(
root_path.joinpath(dir).joinpath('.git').exists()
for dir in file_path.relative_to(root_path).parents
)
def _log_clearcut_events(self):
with self._pending_events_lock:
self._scheduled_log_thread = None
edit_events = self.pending_events
self.pending_events = []
pending_events_size = len(edit_events)
if pending_events_size > self.single_events_size_threshold:
logging.info(
"got %d events in %d seconds, sending aggregated events instead",
pending_events_size,
self.flush_interval_sec,
)
aggregated_event_time = edit_events[0][1]
aggregated_event_proto = edit_event_pb2.EditEvent(
user_name=self.user_name,
host_name=self.host_name,
source_root=self.source_root,
)
aggregated_event_proto.aggregated_edit_event.CopyFrom(
edit_event_pb2.EditEvent.AggregatedEditEvent(
num_edits=pending_events_size
)
)
edit_events = [(aggregated_event_proto, aggregated_event_time)]
if self.is_dry_run:
logging.info("Sent %d edit events in dry run.", len(edit_events))
return
for event_proto, event_time in edit_events:
log_event = clientanalytics_pb2.LogEvent(
event_time_ms=int(event_time * 1000),
source_extension=event_proto.SerializeToString(),
)
self.cclient.log(log_event)
logging.info("sent %d edit events", len(edit_events))
def start(
path: str,
is_dry_run: bool = False,
flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
cclient: clearcut_client.Clearcut | None = None,
pipe_sender: multiprocessing.connection.Connection | None = None,
):
"""Method to start the edit monitor.
This is the entry point to start the edit monitor as a subprocess of
the daemon manager.
params:
path: The root path to monitor
cclient: The clearcut client to send the edit logs.
conn: the sender of the pipe to communicate with the deamon manager.
"""
event_handler = ClearcutEventHandler(
path, flush_interval_sec, single_events_size_threshold, is_dry_run, cclient)
observer = Observer()
logging.info("Starting observer on path %s.", path)
observer.schedule(event_handler, path, recursive=True)
observer.start()
logging.info("Observer started.")
if pipe_sender:
pipe_sender.send("Observer started.")
try:
while True:
time.sleep(1)
finally:
event_handler.flushall()
observer.stop()
observer.join()
if pipe_sender:
pipe_sender.close()
|