diff options
author | 2024-10-09 18:04:39 +0000 | |
---|---|---|
committer | 2024-10-23 22:14:34 +0000 | |
commit | 8a2257965207cf117a33bc52dc9e39c94e35a343 (patch) | |
tree | c5ff4af3782df27c7ec73a7e0be6024f61a4a8de | |
parent | 35bd3d27b96af9bf77a0a1e69358b02550c063a5 (diff) |
Performance optimization for edit monitor
Instead of log every edit event immediately when received the event,
cache the events and log the cached events in batch periodically. In
case when there are many edits events recieved in a short time (probably
due to non-human operation like repo sync), send an aggregated edit
event instead to prevent performance degrade.
Test: atest edit_monitor_test
Bug: 365617369
Change-Id: Ibe1613cf1e2eb37ebc5dfa5c029b990854fcf91e
-rw-r--r-- | tools/edit_monitor/daemon_manager.py | 9 | ||||
-rw-r--r-- | tools/edit_monitor/edit_monitor.py | 76 | ||||
-rw-r--r-- | tools/edit_monitor/edit_monitor_test.py | 40 |
3 files changed, 114 insertions, 11 deletions
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py index 892c292a71..4ff4ec87cf 100644 --- a/tools/edit_monitor/daemon_manager.py +++ b/tools/edit_monitor/daemon_manager.py @@ -133,8 +133,12 @@ class DaemonManager: logging.debug("in daemon manager cleanup.") try: - if self.daemon_process and self.daemon_process.is_alive(): - self._terminate_process(self.daemon_process.pid) + if self.daemon_process: + # The daemon process might already in termination process, + # wait some time before kill it explicitly. + self._wait_for_process_terminate(self.daemon_process.pid, 1) + if self.daemon_process.is_alive(): + self._terminate_process(self.daemon_process.pid) self._remove_pidfile() logging.debug("Successfully stopped daemon manager.") except Exception as e: @@ -227,6 +231,7 @@ class DaemonManager: p = multiprocessing.Process( target=self.daemon_target, args=self.daemon_args ) + p.daemon = True p.start() logging.info("Start subprocess with PID %d", p.pid) diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py index defc84186f..31115d4cb4 100644 --- a/tools/edit_monitor/edit_monitor.py +++ b/tools/edit_monitor/edit_monitor.py @@ -19,6 +19,7 @@ import multiprocessing.connection import os import pathlib import platform +import threading import time from atest.metrics import clearcut_client @@ -31,22 +32,34 @@ from watchdog.observers import Observer # Enum of the Clearcut log source defined under # /google3/wireless/android/play/playlog/proto/log_source_enum.proto LOG_SOURCE = 2524 +DEFAULT_FLUSH_INTERVAL_SECONDS = 5 +DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 class ClearcutEventHandler(PatternMatchingEventHandler): def __init__( - self, path: str, cclient: clearcut_client.Clearcut | None = None + self, + path: str, + flush_interval_sec: int, + single_events_size_threshold: int, + cclient: clearcut_client.Clearcut | None = None, ): super().__init__(patterns=["*"], ignore_directories=True) self.root_monitoring_path = path + self.flush_interval_sec = flush_interval_sec + self.single_events_size_threshold = single_events_size_threshold self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) self.user_name = getpass.getuser() self.host_name = platform.node() self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") + self.pending_events = [] + self._scheduled_log_thread = None + self._pending_events_lock = threading.Lock() + def on_moved(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) @@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler): def flushall(self): logging.info("flushing all pending events.") + if self._scheduled_log_thread: + logging.info("canceling log thread") + self._scheduled_log_thread.cancel() + self._scheduled_log_thread = None + + self._log_clearcut_events() self.cclient.flush_events() def _log_edit_event( @@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler): file_path=event.src_path, edit_type=edit_type ) ) - clearcut_log_event = clientanalytics_pb2.LogEvent( - event_time_ms=int(event_time * 1000), - source_extension=event_proto.SerializeToString(), - ) + with self._pending_events_lock: + self.pending_events.append((event_proto, event_time)) + if not self._scheduled_log_thread: + logging.debug( + "Scheduling thread to run in %d seconds", self.flush_interval_sec + ) + self._scheduled_log_thread = threading.Timer( + self.flush_interval_sec, self._log_clearcut_events + ) + self._scheduled_log_thread.start() - self.cclient.log(clearcut_log_event) except Exception: logging.exception("Failed to log edit event.") @@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler): for dir in file_path.relative_to(root_path).parents ) + def _log_clearcut_events(self): + with self._pending_events_lock: + self._scheduled_log_thread = None + edit_events = self.pending_events + self.pending_events = [] + + pending_events_size = len(edit_events) + if pending_events_size > self.single_events_size_threshold: + logging.info( + "got %d events in %d seconds, sending aggregated events instead", + pending_events_size, + self.flush_interval_sec, + ) + aggregated_event_time = edit_events[0][1] + aggregated_event_proto = edit_event_pb2.EditEvent( + user_name=self.user_name, + host_name=self.host_name, + source_root=self.source_root, + ) + aggregated_event_proto.aggregated_edit_event.CopyFrom( + edit_event_pb2.EditEvent.AggregatedEditEvent( + num_edits=pending_events_size + ) + ) + edit_events = [(aggregated_event_proto, aggregated_event_time)] + + for event_proto, event_time in edit_events: + log_event = clientanalytics_pb2.LogEvent( + event_time_ms=int(event_time * 1000), + source_extension=event_proto.SerializeToString(), + ) + self.cclient.log(log_event) + + logging.info("sent %d edit events", len(edit_events)) + def start( path: str, + flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, + single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, cclient: clearcut_client.Clearcut | None = None, pipe_sender: multiprocessing.connection.Connection | None = None, ): @@ -130,7 +191,8 @@ def start( cclient: The clearcut client to send the edit logs. conn: the sender of the pipe to communicate with the deamon manager. """ - event_handler = ClearcutEventHandler(path, cclient) + event_handler = ClearcutEventHandler( + path, flush_interval_sec, single_events_size_threshold, cclient) observer = Observer() logging.info("Starting observer on path %s.", path) diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py index 5bc1b1371c..4ec3284e6a 100644 --- a/tools/edit_monitor/edit_monitor_test.py +++ b/tools/edit_monitor/edit_monitor_test.py @@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase): self.working_dir.cleanup() super().tearDown() - def test_log_edit_event_success(self): + def test_log_single_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( @@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase): ).single_edit_event, ) + + def test_log_aggregated_edit_event_success(self): + # Create the .git file under the monitoring dir. + self.root_monitoring_path.joinpath('.git').touch() + fake_cclient = FakeClearcutClient( + log_output_file=self.log_event_dir.joinpath('logs.output') + ) + p = self._start_test_edit_monitor_process(fake_cclient) + + # Create 6 test files + for i in range(6): + test_file = self.root_monitoring_path.joinpath('test_' + str(i)) + test_file.touch() + + # Give some time for the edit monitor to receive the edit event. + time.sleep(1) + # Stop the edit monitor and flush all events. + os.kill(p.pid, signal.SIGINT) + p.join() + + logged_events = self._get_logged_events() + self.assertEqual(len(logged_events), 1) + + expected_aggregated_edit_event = ( + edit_event_pb2.EditEvent.AggregatedEditEvent( + num_edits=6, + ) + ) + + self.assertEqual( + expected_aggregated_edit_event, + edit_event_pb2.EditEvent.FromString( + logged_events[0].source_extension + ).aggregated_edit_event, + ) + def test_do_not_log_edit_event_for_directory_change(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() @@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase): # Start edit monitor in a subprocess. p = multiprocessing.Process( target=edit_monitor.start, - args=(str(self.root_monitoring_path.resolve()), cclient, sender), + args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender), ) p.daemon = True p.start() |