Use concurrent.futures.ThreadExecutor for testrunner threads

Moves from controlling threads with a semaphore to a slightly more
convenient thread pool.

Moves printing to be on the main thread.

Removes unnecessary thread for supervising tests.

Bug: 129406631
Test: art/test/testrunner.py --dry-run (and simulate failures locally)
Change-Id: I23244d35afa2341f1696ebc60cd009fb133ba872
diff --git a/test/testrunner/testrunner.py b/test/testrunner/testrunner.py
index 461887e..ac1b2c6 100755
--- a/test/testrunner/testrunner.py
+++ b/test/testrunner/testrunner.py
@@ -46,6 +46,7 @@
 """
 import argparse
 import collections
+import concurrent.futures
 import contextlib
 import fnmatch
 import itertools
@@ -58,7 +59,6 @@
 import subprocess
 import sys
 import tempfile
-import threading
 import time
 
 import env
@@ -89,27 +89,15 @@
 COLOR_SKIP = '\033[93m'
 COLOR_NORMAL = '\033[0m'
 
-# The mutex object is used by the threads for exclusive access of test_count
-# to make any changes in its value.
-test_count_mutex = threading.Lock()
-
 # The set contains the list of all the possible run tests that are in art/test
 # directory.
 RUN_TEST_SET = set()
 
-# The semaphore object is used by the testrunner to limit the number of
-# threads to the user requested concurrency value.
-semaphore = threading.Semaphore(1)
-
-# The mutex object is used to provide exclusive access to a thread to print
-# its output.
-print_mutex = threading.Lock()
 failed_tests = []
 skipped_tests = []
 
 # Flags
 n_thread = -1
-test_count = 0
 total_test_count = 0
 verbose = False
 dry_run = False
@@ -121,7 +109,6 @@
 with_agent = []
 zipapex_loc = None
 run_test_option = []
-stop_testrunner = False
 dex2oat_jobs = -1   # -1 corresponds to default threads for dex2oat
 run_all_configs = False
 
@@ -236,9 +223,6 @@
   for target in _user_input_variants['target']:
     extra_arguments[target] = find_extra_device_arguments(target)
 
-  global semaphore
-  semaphore = threading.Semaphore(n_thread)
-
   if not sys.stdout.isatty():
     global COLOR_ERROR
     global COLOR_PASS
@@ -277,15 +261,7 @@
     return "UNKNOWN_TARGET"
 
 def run_tests(tests):
-  """Creates thread workers to run the tests.
-
-  The method generates command and thread worker to run the tests. Depending on
-  the user input for the number of threads to be used, the method uses a
-  semaphore object to keep a count in control for the thread workers. When a new
-  worker is created, it acquires the semaphore object, and when the number of
-  workers reaches the maximum allowed concurrency, the method wait for an
-  existing thread worker to release the semaphore object. Worker releases the
-  semaphore object when they finish printing the output.
+  """This method generates variants of the tests to be run and executes them.
 
   Args:
     tests: The set of tests to be run.
@@ -362,18 +338,10 @@
       'debuggable': [''], 'jvmti': [''],
       'cdex_level': ['']})
 
-  def start_combination(config_tuple, global_options, address_size):
+  def start_combination(executor, config_tuple, global_options, address_size):
       test, target, run, prebuild, compiler, relocate, trace, gc, \
       jni, image, debuggable, jvmti, cdex_level = config_tuple
 
-      if stop_testrunner:
-        # When ART_TEST_KEEP_GOING is set to false, then as soon as a test
-        # fails, stop_testrunner is set to True. When this happens, the method
-        # stops creating any any thread and wait for all the exising threads
-        # to end.
-        while threading.active_count() > 2:
-          time.sleep(0.1)
-          return
       # NB The order of components here should match the order of
       # components in the regex parser in parse_test_name.
       test_name = 'test-art-'
@@ -498,25 +466,32 @@
 
       run_test_sh = env.ANDROID_BUILD_TOP + '/art/test/run-test'
       command = ' '.join((run_test_sh, options_test, ' '.join(extra_arguments[target]), test))
-
-      semaphore.acquire()
-      worker = threading.Thread(target=run_test, args=(command, test, variant_set, test_name))
-      worker.daemon = True
-      worker.start()
+      return executor.submit(run_test, command, test, variant_set, test_name)
 
   #  Use a context-manager to handle cleaning up the extracted zipapex if needed.
   with handle_zipapex(zipapex_loc) as zipapex_opt:
     options_all += zipapex_opt
-    for config_tuple in config:
-      target = config_tuple[1]
-      for address_size in _user_input_variants['address_sizes_target'][target]:
-        start_combination(config_tuple, options_all, address_size)
+    global n_thread
+    with concurrent.futures.ThreadPoolExecutor(max_workers=n_thread) as executor:
+      test_futures = []
+      for config_tuple in config:
+        target = config_tuple[1]
+        for address_size in _user_input_variants['address_sizes_target'][target]:
+          test_futures.append(start_combination(executor, config_tuple, options_all, address_size))
 
-    for config_tuple in uncombinated_config:
-        start_combination(config_tuple, options_all, "")  # no address size
+        for config_tuple in uncombinated_config:
+          test_futures.append(start_combination(executor, config_tuple, options_all, ""))  # no address size
 
-    while threading.active_count() > 2:
-      time.sleep(0.1)
+      tests_done = 0
+      for test_future in concurrent.futures.as_completed(test_futures):
+        (test, status, failure_info) = test_future.result()
+        tests_done += 1
+        print_test_info(tests_done, test, status, failure_info)
+        if failure_info and not env.ART_TEST_KEEP_GOING:
+          for f in test_futures:
+            f.cancel()
+          break
+      executor.shutdown(True)
 
 @contextlib.contextmanager
 def handle_zipapex(ziploc):
@@ -543,17 +518,16 @@
   passed, otherwise, put it in the list of failed test. Before actually running
   the test, it also checks if the test is placed in the list of disabled tests,
   and if yes, it skips running it, and adds the test in the list of skipped
-  tests. The method uses print_text method to actually print the output. After
-  successfully running and capturing the output for the test, it releases the
-  semaphore object.
+  tests.
 
   Args:
     command: The command to be used to invoke the script
     test: The name of the test without the variant information.
     test_variant: The set of variant for the test.
     test_name: The name of the test along with the variants.
+
+  Returns: a tuple of testname, status, and optional failure info.
   """
-  global stop_testrunner
   try:
     if is_test_disabled(test, test_variant):
       test_skipped = True
@@ -569,31 +543,23 @@
 
     if not test_skipped:
       if test_passed:
-        print_test_info(test_name, 'PASS')
+        return (test_name, 'PASS', None)
       else:
         failed_tests.append((test_name, str(command) + "\n" + script_output))
-        if not env.ART_TEST_KEEP_GOING:
-          stop_testrunner = True
-        print_test_info(test_name, 'FAIL', ('%s\n%s') % (
-          command, script_output))
+        return (test_name, 'FAIL', ('%s\n%s') % (command, script_output))
     elif not dry_run:
-      print_test_info(test_name, 'SKIP')
       skipped_tests.append(test_name)
+      return (test_name, 'SKIP', None)
     else:
-      print_test_info(test_name, '')
+      return (test_name, 'PASS', None)
   except subprocess.TimeoutExpired as e:
     failed_tests.append((test_name, 'Timed out in %d seconds' % timeout))
-    print_test_info(test_name, 'TIMEOUT', 'Timed out in %d seconds\n%s' % (
-        timeout, command))
+    return (test_name, 'TIMEOUT', 'Timed out in %d seconds\n%s' % (timeout, command))
   except Exception as e:
     failed_tests.append((test_name, str(e)))
-    print_test_info(test_name, 'FAIL',
-    ('%s\n%s\n\n') % (command, str(e)))
-  finally:
-    semaphore.release()
+    return (test_name, 'FAIL', ('%s\n%s\n\n') % (command, str(e)))
 
-
-def print_test_info(test_name, result, failed_test_info=""):
+def print_test_info(test_count, test_name, result, failed_test_info=""):
   """Print the continous test information
 
   If verbose is set to True, it continuously prints test status information
@@ -608,7 +574,6 @@
   test information in either of the cases.
   """
 
-  global test_count
   info = ''
   if not verbose:
     # Without --verbose, the testrunner erases passing test info. It
@@ -617,8 +582,6 @@
     console_width = int(os.popen('stty size', 'r').read().split()[1])
     info = '\r' + ' ' * console_width + '\r'
   try:
-    print_mutex.acquire()
-    test_count += 1
     percent = (test_count * 100) / total_test_count
     progress_info = ('[ %d%% %d/%d ]') % (
       percent,
@@ -666,8 +629,6 @@
   except Exception as e:
     print_text(('%s\n%s\n') % (test_name, str(e)))
     failed_tests.append(test_name)
-  finally:
-    print_mutex.release()
 
 def verify_knownfailure_entry(entry):
   supported_field = {
@@ -1050,28 +1011,16 @@
       if env.DIST_DIR:
         shutil.copyfile(env.SOONG_OUT_DIR + '/build.ninja', env.DIST_DIR + '/soong.ninja')
       sys.exit(1)
+
   if user_requested_tests:
-    test_runner_thread = threading.Thread(target=run_tests, args=(user_requested_tests,))
+    run_tests(user_requested_tests)
   else:
-    test_runner_thread = threading.Thread(target=run_tests, args=(RUN_TEST_SET,))
-  test_runner_thread.daemon = True
-  try:
-    test_runner_thread.start()
-    # This loops waits for all the threads to finish, unless
-    # stop_testrunner is set to True. When ART_TEST_KEEP_GOING
-    # is set to false, stop_testrunner is set to True as soon as
-    # a test fails to signal the parent thread  to stop
-    # the execution of the testrunner.
-    while threading.active_count() > 1 and not stop_testrunner:
-      time.sleep(0.1)
-    print_analysis()
-  except Exception as e:
-    print_analysis()
-    print_text(str(e))
-    sys.exit(1)
-  if failed_tests:
-    sys.exit(1)
-  sys.exit(0)
+    run_tests(RUN_TEST_SET)
+
+  print_analysis()
+
+  exit_code = 0 if len(failed_tests) == 0 else 1
+  sys.exit(exit_code)
 
 if __name__ == '__main__':
   main()