summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Zhuoyao Zhang <zhuoyao@google.com> 2024-10-09 18:04:39 +0000
committer Zhuoyao Zhang <zhuoyao@google.com> 2024-10-23 22:14:34 +0000
commit8a2257965207cf117a33bc52dc9e39c94e35a343 (patch)
treec5ff4af3782df27c7ec73a7e0be6024f61a4a8de
parent35bd3d27b96af9bf77a0a1e69358b02550c063a5 (diff)
Performance optimization for edit monitor
Instead of log every edit event immediately when received the event, cache the events and log the cached events in batch periodically. In case when there are many edits events recieved in a short time (probably due to non-human operation like repo sync), send an aggregated edit event instead to prevent performance degrade. Test: atest edit_monitor_test Bug: 365617369 Change-Id: Ibe1613cf1e2eb37ebc5dfa5c029b990854fcf91e
-rw-r--r--tools/edit_monitor/daemon_manager.py9
-rw-r--r--tools/edit_monitor/edit_monitor.py76
-rw-r--r--tools/edit_monitor/edit_monitor_test.py40
3 files changed, 114 insertions, 11 deletions
diff --git a/tools/edit_monitor/daemon_manager.py b/tools/edit_monitor/daemon_manager.py
index 892c292a71..4ff4ec87cf 100644
--- a/tools/edit_monitor/daemon_manager.py
+++ b/tools/edit_monitor/daemon_manager.py
@@ -133,8 +133,12 @@ class DaemonManager:
logging.debug("in daemon manager cleanup.")
try:
- if self.daemon_process and self.daemon_process.is_alive():
- self._terminate_process(self.daemon_process.pid)
+ if self.daemon_process:
+ # The daemon process might already in termination process,
+ # wait some time before kill it explicitly.
+ self._wait_for_process_terminate(self.daemon_process.pid, 1)
+ if self.daemon_process.is_alive():
+ self._terminate_process(self.daemon_process.pid)
self._remove_pidfile()
logging.debug("Successfully stopped daemon manager.")
except Exception as e:
@@ -227,6 +231,7 @@ class DaemonManager:
p = multiprocessing.Process(
target=self.daemon_target, args=self.daemon_args
)
+ p.daemon = True
p.start()
logging.info("Start subprocess with PID %d", p.pid)
diff --git a/tools/edit_monitor/edit_monitor.py b/tools/edit_monitor/edit_monitor.py
index defc84186f..31115d4cb4 100644
--- a/tools/edit_monitor/edit_monitor.py
+++ b/tools/edit_monitor/edit_monitor.py
@@ -19,6 +19,7 @@ import multiprocessing.connection
import os
import pathlib
import platform
+import threading
import time
from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@ from watchdog.observers import Observer
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
+DEFAULT_FLUSH_INTERVAL_SECONDS = 5
+DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
class ClearcutEventHandler(PatternMatchingEventHandler):
def __init__(
- self, path: str, cclient: clearcut_client.Clearcut | None = None
+ self,
+ path: str,
+ flush_interval_sec: int,
+ single_events_size_threshold: int,
+ cclient: clearcut_client.Clearcut | None = None,
):
super().__init__(patterns=["*"], ignore_directories=True)
self.root_monitoring_path = path
+ self.flush_interval_sec = flush_interval_sec
+ self.single_events_size_threshold = single_events_size_threshold
self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
self.user_name = getpass.getuser()
self.host_name = platform.node()
self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
+ self.pending_events = []
+ self._scheduled_log_thread = None
+ self._pending_events_lock = threading.Lock()
+
def on_moved(self, event: FileSystemEvent):
self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
@@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler):
def flushall(self):
logging.info("flushing all pending events.")
+ if self._scheduled_log_thread:
+ logging.info("canceling log thread")
+ self._scheduled_log_thread.cancel()
+ self._scheduled_log_thread = None
+
+ self._log_clearcut_events()
self.cclient.flush_events()
def _log_edit_event(
@@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler):
file_path=event.src_path, edit_type=edit_type
)
)
- clearcut_log_event = clientanalytics_pb2.LogEvent(
- event_time_ms=int(event_time * 1000),
- source_extension=event_proto.SerializeToString(),
- )
+ with self._pending_events_lock:
+ self.pending_events.append((event_proto, event_time))
+ if not self._scheduled_log_thread:
+ logging.debug(
+ "Scheduling thread to run in %d seconds", self.flush_interval_sec
+ )
+ self._scheduled_log_thread = threading.Timer(
+ self.flush_interval_sec, self._log_clearcut_events
+ )
+ self._scheduled_log_thread.start()
- self.cclient.log(clearcut_log_event)
except Exception:
logging.exception("Failed to log edit event.")
@@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler):
for dir in file_path.relative_to(root_path).parents
)
+ def _log_clearcut_events(self):
+ with self._pending_events_lock:
+ self._scheduled_log_thread = None
+ edit_events = self.pending_events
+ self.pending_events = []
+
+ pending_events_size = len(edit_events)
+ if pending_events_size > self.single_events_size_threshold:
+ logging.info(
+ "got %d events in %d seconds, sending aggregated events instead",
+ pending_events_size,
+ self.flush_interval_sec,
+ )
+ aggregated_event_time = edit_events[0][1]
+ aggregated_event_proto = edit_event_pb2.EditEvent(
+ user_name=self.user_name,
+ host_name=self.host_name,
+ source_root=self.source_root,
+ )
+ aggregated_event_proto.aggregated_edit_event.CopyFrom(
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=pending_events_size
+ )
+ )
+ edit_events = [(aggregated_event_proto, aggregated_event_time)]
+
+ for event_proto, event_time in edit_events:
+ log_event = clientanalytics_pb2.LogEvent(
+ event_time_ms=int(event_time * 1000),
+ source_extension=event_proto.SerializeToString(),
+ )
+ self.cclient.log(log_event)
+
+ logging.info("sent %d edit events", len(edit_events))
+
def start(
path: str,
+ flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
+ single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
cclient: clearcut_client.Clearcut | None = None,
pipe_sender: multiprocessing.connection.Connection | None = None,
):
@@ -130,7 +191,8 @@ def start(
cclient: The clearcut client to send the edit logs.
conn: the sender of the pipe to communicate with the deamon manager.
"""
- event_handler = ClearcutEventHandler(path, cclient)
+ event_handler = ClearcutEventHandler(
+ path, flush_interval_sec, single_events_size_threshold, cclient)
observer = Observer()
logging.info("Starting observer on path %s.", path)
diff --git a/tools/edit_monitor/edit_monitor_test.py b/tools/edit_monitor/edit_monitor_test.py
index 5bc1b1371c..4ec3284e6a 100644
--- a/tools/edit_monitor/edit_monitor_test.py
+++ b/tools/edit_monitor/edit_monitor_test.py
@@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase):
self.working_dir.cleanup()
super().tearDown()
- def test_log_edit_event_success(self):
+ def test_log_single_edit_event_success(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
fake_cclient = FakeClearcutClient(
@@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase):
).single_edit_event,
)
+
+ def test_log_aggregated_edit_event_success(self):
+ # Create the .git file under the monitoring dir.
+ self.root_monitoring_path.joinpath('.git').touch()
+ fake_cclient = FakeClearcutClient(
+ log_output_file=self.log_event_dir.joinpath('logs.output')
+ )
+ p = self._start_test_edit_monitor_process(fake_cclient)
+
+ # Create 6 test files
+ for i in range(6):
+ test_file = self.root_monitoring_path.joinpath('test_' + str(i))
+ test_file.touch()
+
+ # Give some time for the edit monitor to receive the edit event.
+ time.sleep(1)
+ # Stop the edit monitor and flush all events.
+ os.kill(p.pid, signal.SIGINT)
+ p.join()
+
+ logged_events = self._get_logged_events()
+ self.assertEqual(len(logged_events), 1)
+
+ expected_aggregated_edit_event = (
+ edit_event_pb2.EditEvent.AggregatedEditEvent(
+ num_edits=6,
+ )
+ )
+
+ self.assertEqual(
+ expected_aggregated_edit_event,
+ edit_event_pb2.EditEvent.FromString(
+ logged_events[0].source_extension
+ ).aggregated_edit_event,
+ )
+
def test_do_not_log_edit_event_for_directory_change(self):
# Create the .git file under the monitoring dir.
self.root_monitoring_path.joinpath('.git').touch()
@@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase):
# Start edit monitor in a subprocess.
p = multiprocessing.Process(
target=edit_monitor.start,
- args=(str(self.root_monitoring_path.resolve()), cclient, sender),
+ args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender),
)
p.daemon = True
p.start()