diff options
-rw-r--r-- | tools/edit_monitor/daemon_manager.py | 9 | ||||
-rw-r--r-- | tools/edit_monitor/edit_monitor.py | 76 | ||||
-rw-r--r-- | tools/edit_monitor/edit_monitor_test.py | 40 |
3 files changed, 114 insertions, 11 deletions
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py index 892c292a71..4ff4ec87cf 100644 --- a/tools/edit_monitor/daemon_manager.py +++ b/tools/edit_monitor/daemon_manager.py @@ -133,8 +133,12 @@ class DaemonManager: logging.debug("in daemon manager cleanup.") try: - if self.daemon_process and self.daemon_process.is_alive(): - self._terminate_process(self.daemon_process.pid) + if self.daemon_process: + # The daemon process might already in termination process, + # wait some time before kill it explicitly. + self._wait_for_process_terminate(self.daemon_process.pid, 1) + if self.daemon_process.is_alive(): + self._terminate_process(self.daemon_process.pid) self._remove_pidfile() logging.debug("Successfully stopped daemon manager.") except Exception as e: @@ -227,6 +231,7 @@ class DaemonManager: p = multiprocessing.Process( target=self.daemon_target, args=self.daemon_args ) + p.daemon = True p.start() logging.info("Start subprocess with PID %d", p.pid) diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py index defc84186f..31115d4cb4 100644 --- a/tools/edit_monitor/edit_monitor.py +++ b/tools/edit_monitor/edit_monitor.py @@ -19,6 +19,7 @@ import multiprocessing.connection import os import pathlib import platform +import threading import time from atest.metrics import clearcut_client @@ -31,22 +32,34 @@ from watchdog.observers import Observer # Enum of the Clearcut log source defined under # /google3/wireless/android/play/playlog/proto/log_source_enum.proto LOG_SOURCE = 2524 +DEFAULT_FLUSH_INTERVAL_SECONDS = 5 +DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 class ClearcutEventHandler(PatternMatchingEventHandler): def __init__( - self, path: str, cclient: clearcut_client.Clearcut | None = None + self, + path: str, + flush_interval_sec: int, + single_events_size_threshold: int, + cclient: clearcut_client.Clearcut | None = None, ): super().__init__(patterns=["*"], ignore_directories=True) self.root_monitoring_path = path + self.flush_interval_sec = flush_interval_sec + self.single_events_size_threshold = single_events_size_threshold self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) self.user_name = getpass.getuser() self.host_name = platform.node() self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") + self.pending_events = [] + self._scheduled_log_thread = None + self._pending_events_lock = threading.Lock() + def on_moved(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) @@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler): def flushall(self): logging.info("flushing all pending events.") + if self._scheduled_log_thread: + logging.info("canceling log thread") + self._scheduled_log_thread.cancel() + self._scheduled_log_thread = None + + self._log_clearcut_events() self.cclient.flush_events() def _log_edit_event( @@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler): file_path=event.src_path, edit_type=edit_type ) ) - clearcut_log_event = clientanalytics_pb2.LogEvent( - event_time_ms=int(event_time * 1000), - source_extension=event_proto.SerializeToString(), - ) + with self._pending_events_lock: + self.pending_events.append((event_proto, event_time)) + if not self._scheduled_log_thread: + logging.debug( + "Scheduling thread to run in %d seconds", self.flush_interval_sec + ) + self._scheduled_log_thread = threading.Timer( + self.flush_interval_sec, self._log_clearcut_events + ) + self._scheduled_log_thread.start() - self.cclient.log(clearcut_log_event) except Exception: logging.exception("Failed to log edit event.") @@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler): for dir in file_path.relative_to(root_path).parents ) + def _log_clearcut_events(self): + with self._pending_events_lock: + self._scheduled_log_thread = None + edit_events = self.pending_events + self.pending_events = [] + + pending_events_size = len(edit_events) + if pending_events_size > self.single_events_size_threshold: + logging.info( + "got %d events in %d seconds, sending aggregated events instead", + pending_events_size, + self.flush_interval_sec, + ) + aggregated_event_time = edit_events[0][1] + aggregated_event_proto = edit_event_pb2.EditEvent( + user_name=self.user_name, + host_name=self.host_name, + source_root=self.source_root, + ) + aggregated_event_proto.aggregated_edit_event.CopyFrom( + edit_event_pb2.EditEvent.AggregatedEditEvent( + num_edits=pending_events_size + ) + ) + edit_events = [(aggregated_event_proto, aggregated_event_time)] + + for event_proto, event_time in edit_events: + log_event = clientanalytics_pb2.LogEvent( + event_time_ms=int(event_time * 1000), + source_extension=event_proto.SerializeToString(), + ) + self.cclient.log(log_event) + + logging.info("sent %d edit events", len(edit_events)) + def start( path: str, + flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, + single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, cclient: clearcut_client.Clearcut | None = None, pipe_sender: multiprocessing.connection.Connection | None = None, ): @@ -130,7 +191,8 @@ def start( cclient: The clearcut client to send the edit logs. conn: the sender of the pipe to communicate with the deamon manager. """ - event_handler = ClearcutEventHandler(path, cclient) + event_handler = ClearcutEventHandler( + path, flush_interval_sec, single_events_size_threshold, cclient) observer = Observer() logging.info("Starting observer on path %s.", path) diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py index 5bc1b1371c..4ec3284e6a 100644 --- a/tools/edit_monitor/edit_monitor_test.py +++ b/tools/edit_monitor/edit_monitor_test.py @@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase): self.working_dir.cleanup() super().tearDown() - def test_log_edit_event_success(self): + def test_log_single_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( @@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase): ).single_edit_event, ) + + def test_log_aggregated_edit_event_success(self): + # Create the .git file under the monitoring dir. + self.root_monitoring_path.joinpath('.git').touch() + fake_cclient = FakeClearcutClient( + log_output_file=self.log_event_dir.joinpath('logs.output') + ) + p = self._start_test_edit_monitor_process(fake_cclient) + + # Create 6 test files + for i in range(6): + test_file = self.root_monitoring_path.joinpath('test_' + str(i)) + test_file.touch() + + # Give some time for the edit monitor to receive the edit event. + time.sleep(1) + # Stop the edit monitor and flush all events. + os.kill(p.pid, signal.SIGINT) + p.join() + + logged_events = self._get_logged_events() + self.assertEqual(len(logged_events), 1) + + expected_aggregated_edit_event = ( + edit_event_pb2.EditEvent.AggregatedEditEvent( + num_edits=6, + ) + ) + + self.assertEqual( + expected_aggregated_edit_event, + edit_event_pb2.EditEvent.FromString( + logged_events[0].source_extension + ).aggregated_edit_event, + ) + def test_do_not_log_edit_event_for_directory_change(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() @@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase): # Start edit monitor in a subprocess. p = multiprocessing.Process( target=edit_monitor.start, - args=(str(self.root_monitoring_path.resolve()), cclient, sender), + args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender), ) p.daemon = True p.start() |