Commit 9aa28daa authored by Liviu Rau's avatar Liviu Rau Committed by V8 LUCI CQ

[test] Refactor testrunner (5)

 - Unify old Pool interface with the new context related interface
 - Add single threaded execution pool
 - Defer task killing back to OS context
 - Defer process listing in indicators back to OS context

Bug: v8:12785
Cq-Include-Trybots: luci.v8.try:v8_numfuzz_dbg_ng,v8_numfuzz_ng,v8_numfuzz_tsan_ng,v8_android_arm64_n5x_rel_ng
Change-Id: I8ffe01c5d567411203f69ecc451c718ff35d81c9
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3781347Reviewed-by: 's avatarAlexander Schulze <alexschulze@chromium.org>
Commit-Queue: Liviu Rau <liviurau@google.com>
Cr-Commit-Position: refs/heads/main@{#82371}
parent 4e5757e7
......@@ -2,9 +2,10 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from collections import OrderedDict, namedtuple
from functools import reduce
from os.path import dirname as up
from collections import OrderedDict, namedtuple
import json
import multiprocessing
import optparse
......@@ -13,17 +14,15 @@ import shlex
import sys
import traceback
from os.path import dirname as up
from testrunner.local import command
from testrunner.build_config import BuildConfig
from testrunner.local import testsuite
from testrunner.local import utils
from testrunner.local.context import os_context
from testrunner.test_config import TestConfig
from testrunner.testproc import util
from testrunner.testproc.indicators import PROGRESS_INDICATORS
from testrunner.testproc.sigproc import SignalProc
from testrunner.utils.augmented_options import AugmentedOptions
from testrunner.build_config import BuildConfig
DEFAULT_OUT_GN = 'out.gn'
......@@ -166,7 +165,7 @@ class BaseTestRunner(object):
args = self._parse_test_args(args)
with command.os_context(self.target_os, self.options) as ctx:
with os_context(self.target_os, self.options) as ctx:
names = self._args_to_suite_names(args)
tests = self._load_testsuite_generators(ctx, names)
self._setup_env()
......
......@@ -13,8 +13,7 @@ import time
from ..local.android import (Driver, CommandFailedException, TimeoutException)
from ..objects import output
from ..local.pool import DefaultExecutionPool, AbortException,\
taskkill_windows
from ..local.pool import AbortException
BASE_DIR = os.path.normpath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..' , '..', '..'))
......@@ -208,6 +207,22 @@ class PosixCommand(BaseCommand):
os.killpg(process.pid, signal.SIGKILL)
def taskkill_windows(process, verbose=False, force=True):
force_flag = ' /F' if force else ''
tk = subprocess.Popen(
'taskkill /T%s /PID %d' % (force_flag, process.pid),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = tk.communicate()
if verbose:
print('Taskkill results for %d' % process.pid)
print(stdout)
print(stderr)
print('Return code: %d' % tk.returncode)
sys.stdout.flush()
class WindowsCommand(BaseCommand):
def _start_process(self, **kwargs):
# Try to change the error mode to avoid dialogs on fatal errors. Don't
......@@ -312,51 +327,7 @@ class AndroidCommand(BaseCommand):
Command = None
class DefaultOSContext():
def __init__(self, command, pool=None):
self.command = command
self.pool = pool or DefaultExecutionPool()
@contextmanager
def context(self, options):
yield
class AndroidOSContext(DefaultOSContext):
def __init__(self):
super(AndroidOSContext, self).__init__(AndroidCommand)
@contextmanager
def context(self, options):
try:
AndroidCommand.driver = Driver.instance(options.device)
yield
finally:
AndroidCommand.driver.tear_down()
# TODO(liviurau): Add documentation with diagrams to describe how context and
# its components gets initialized and eventually teared down and how does it
# interact with both tests and underlying platform specific concerns.
def find_os_context_factory(target_os):
registry = dict(
android=AndroidOSContext,
windows=lambda: DefaultOSContext(WindowsCommand))
default = lambda: DefaultOSContext(PosixCommand)
return registry.get(target_os, default)
@contextmanager
def os_context(target_os, options):
factory = find_os_context_factory(target_os)
context = factory()
with context.context(options):
yield context
# Deprecated : use os_context
# Deprecated : use context.os_context
def setup(target_os, device):
"""Set the Command class to the OS-specific version."""
global Command
......@@ -369,7 +340,7 @@ def setup(target_os, device):
Command = PosixCommand
# Deprecated : use os_context
# Deprecated : use context.os_context
def tear_down():
"""Clean up after using commands."""
if Command == AndroidCommand:
......
# Copyright 2022 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from contextlib import contextmanager
import os
import signal
import subprocess
import sys
from ..local.android import Driver
from .command import AndroidCommand, PosixCommand, WindowsCommand, taskkill_windows
from .pool import DefaultExecutionPool
from ..testproc.util import list_processes_linux
class DefaultOSContext:
def __init__(self, command, pool=None):
self.command = command
self.pool = pool or DefaultExecutionPool(self)
@contextmanager
def handle_context(self, options):
yield
def list_processes(self):
return []
def terminate_process(self, process):
pass
class LinuxContext(DefaultOSContext):
def __init__(self):
super().__init__(PosixCommand)
def list_processes(self):
return list_processes_linux()
def terminate_process(self, process):
os.kill(process.pid, signal.SIGTERM)
class WindowsContext(DefaultOSContext):
def __init__(self):
super().__init__(WindowsCommand)
def terminate_process(self, process):
taskkill_windows(process, verbose=True, force=False)
class AndroidOSContext(DefaultOSContext):
def __init__(self):
super().__init__(AndroidCommand)
@contextmanager
def handle_context(self, options):
try:
AndroidCommand.driver = Driver.instance(options.device)
yield
finally:
AndroidCommand.driver.tear_down()
# TODO(liviurau): Add documentation with diagrams to describe how context and
# its components gets initialized and eventually teared down and how does it
# interact with both tests and underlying platform specific concerns.
def find_os_context_factory(target_os):
registry = dict(android=AndroidOSContext, windows=WindowsContext)
default = LinuxContext
return registry.get(target_os, default)
@contextmanager
def os_context(target_os, options):
factory = find_os_context_factory(target_os)
context_instance = factory()
with context_instance.handle_context(options):
yield context_instance
......@@ -6,13 +6,12 @@
import collections
import os
import signal
import subprocess
import traceback
from contextlib import contextmanager
from multiprocessing import Process, Queue
from queue import Empty
from . import utils
def setup_testing():
......@@ -32,22 +31,6 @@ def setup_testing():
Process.pid = property(lambda self: None)
def taskkill_windows(process, verbose=False, force=True):
force_flag = ' /F' if force else ''
tk = subprocess.Popen(
'taskkill /T%s /PID %d' % (force_flag, process.pid),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = tk.communicate()
if verbose:
print('Taskkill results for %d' % process.pid)
print(stdout)
print(stderr)
print('Return code: %d' % tk.returncode)
sys.stdout.flush()
class AbortException(Exception):
"""Indicates early abort on SIGINT, SIGTERM or internal hard timeout."""
pass
......@@ -116,29 +99,52 @@ def without_sig():
signal.signal(signal.SIGTERM, term_handler)
class Pool():
"""Distributes tasks to a number of worker processes.
New tasks can be added dynamically even after the workers have been started.
Requirement: Tasks can only be added from the parent process, e.g. while
consuming the results generator."""
class ContextPool():
# Factor to calculate the maximum number of items in the work/done queue.
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self):
self.abort_now = False
def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None):
def init(self, num_workers, heartbeat_timeout=1, notify_function=None):
"""
Delayed initialization. At context creation time we have no access to the
below described parameters.
Args:
num_workers: Number of worker processes to run in parallel.
heartbeat_timeout: Timeout in seconds for waiting for results. Each time
the timeout is reached, a heartbeat is signalled and timeout is reset.
notify_fun: Callable called to signale some events like termination. The
notify_function: Callable called to signal some events like termination. The
event name is passed as string.
"""
self.num_workers = num_workers
pass
def add_jobs(self, jobs):
pass
def results(self, requirement):
pass
def abort(self):
self.abort_now = True
ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
class DefaultExecutionPool(ContextPool):
"""Distributes tasks to a number of worker processes.
New tasks can be added dynamically even after the workers have been started.
Requirement: Tasks can only be added from the parent process, e.g. while
consuming the results generator."""
# Factor to calculate the maximum number of items in the work/done queue.
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self, os_context=None):
super(DefaultExecutionPool, self).__init__()
self.os_context = os_context
self.processes = []
self.terminated = False
self.abort_now = False
# Invariant: processing_count >= #work_queue + #done_queue. It is greater
# when a worker takes an item from the work_queue and before the result is
......@@ -148,8 +154,6 @@ class Pool():
# allowed to remove items from the done_queue and to add items to the
# work_queue.
self.processing_count = 0
self.heartbeat_timeout = heartbeat_timeout
self.notify = notify_fun or (lambda x: x)
# Disable sigint and sigterm to prevent subprocesses from capturing the
# signals.
......@@ -157,6 +161,30 @@ class Pool():
self.work_queue = Queue()
self.done_queue = Queue()
def init(self, num_workers=1, heartbeat_timeout=1, notify_function=None):
"""
Args:
num_workers: Number of worker processes to run in parallel.
heartbeat_timeout: Timeout in seconds for waiting for results. Each time
the timeout is reached, a heartbeat is signalled and timeout is reset.
notify_function: Callable called to signal some events like termination. The
event name is passed as string.
"""
self.num_workers = num_workers
self.heartbeat_timeout = heartbeat_timeout
self.notify = notify_function or (lambda x: x)
def add_jobs(self, jobs):
self.add(jobs)
def results(self, requirement):
return self.imap_unordered(
fn=run_job,
gen=[],
process_context_fn=ProcessContext,
process_context_args=[requirement],
)
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
......@@ -256,10 +284,7 @@ class Pool():
def _terminate_processes(self):
for p in self.processes:
if utils.IsWindows():
taskkill_windows(p, verbose=True, force=False)
else:
os.kill(p.pid, signal.SIGTERM)
self.os_context.terminate_process(p)
def _terminate(self):
"""Terminates execution and cleans up the queues.
......@@ -323,30 +348,22 @@ class Pool():
return MaybeResult.create_heartbeat()
# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job, process_context):
return job.run(process_context)
ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
class DefaultExecutionPool():
class SingleThreadedExecutionPool(ContextPool):
def init(self, jobs, notify_fun):
self._pool = Pool(jobs, notify_fun=notify_fun)
def __init__(self):
super(SingleThreadedExecutionPool, self).__init__()
self.work_queue = []
def add_jobs(self, jobs):
self._pool.add(jobs)
self.work_queue.extend(jobs)
def results(self, requirement):
return self._pool.imap_unordered(
fn=run_job,
gen=[],
process_context_fn=ProcessContext,
process_context_args=[requirement],
)
while self.work_queue and not self.abort_now:
job = self.work_queue.pop()
yield MaybeResult.create_result(job.run(ProcessContext(requirement)))
def abort(self):
self._pool.abort()
# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job, process_context):
return job.run(process_context)
......@@ -12,7 +12,7 @@ TOOLS_PATH = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(TOOLS_PATH)
from testrunner.local.pool import Pool
from testrunner.local.pool import DefaultExecutionPool
def Run(x):
......@@ -25,7 +25,8 @@ class PoolTest(unittest.TestCase):
def testNormal(self):
results = set()
pool = Pool(3)
pool = DefaultExecutionPool()
pool.init(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
if result.heartbeat:
# Any result can be a heartbeat due to timings.
......@@ -35,7 +36,8 @@ class PoolTest(unittest.TestCase):
def testException(self):
results = set()
pool = Pool(3)
pool = DefaultExecutionPool()
pool.init(3)
with self.assertRaises(Exception):
for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
if result.heartbeat:
......@@ -49,7 +51,8 @@ class PoolTest(unittest.TestCase):
def testAdd(self):
results = set()
pool = Pool(3)
pool = DefaultExecutionPool()
pool.init(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
if result.heartbeat:
# Any result can be a heartbeat due to timings.
......
......@@ -12,7 +12,8 @@ TOOLS_PATH = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(TOOLS_PATH)
from testrunner.local.command import DefaultOSContext, PosixCommand
from testrunner.local.command import PosixCommand
from testrunner.local.context import DefaultOSContext
from testrunner.local.testsuite import TestSuite
from testrunner.test_config import TestConfig
......
......@@ -151,7 +151,7 @@ class NumFuzzer(base_runner.BaseTestRunner):
results = ResultsTracker.create(self.options)
execproc = ExecutionProc(ctx, self.options.j)
sigproc = self._create_signal_proc()
progress = ProgressProc(self.options, self.framework_name,
progress = ProgressProc(ctx, self.options, self.framework_name,
tests.test_count_estimate)
procs = [
loader,
......
......@@ -294,7 +294,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
outproc_factory = predictable.get_outproc
execproc = ExecutionProc(ctx, jobs, outproc_factory)
sigproc = self._create_signal_proc()
progress = ProgressProc(self.options, self.framework_name,
progress = ProgressProc(ctx, self.options, self.framework_name,
tests.test_count_estimate)
procs = [
loader,
......
......@@ -230,24 +230,27 @@ class StandardRunnerTest(TestRunnerTest):
def testWithFakeContext(self):
with patch(
'testrunner.local.command.find_os_context_factory',
'testrunner.local.context.find_os_context_factory',
return_value=FakeOSContext):
result = self.run_tests(
'--progress=verbose',
'sweet/cherries',
'sweet',
)
result.stdout_includes('===>Starting stuff\n'
'>>> Running tests for x64.release\n'
'>>> Running with test processors\n')
result.stdout_includes('--- stdout ---\nfake stdout 1')
result.stdout_includes('--- stderr ---\nfake stderr 1')
result.stdout_includes('=== sweet/raspberries ===')
result.stdout_includes('=== sweet/cherries ===')
result.stdout_includes('=== sweet/apples ===')
result.stdout_includes('Command: fake_wrapper ')
result.stdout_includes(
'===\n'
'=== 1 tests failed\n'
'=== 4 tests failed\n'
'===\n'
'>>> 7 base tests produced 1 (14%) non-filtered tests\n'
'>>> 1 tests ran\n'
'>>> 7 base tests produced 7 (100%) non-filtered tests\n'
'>>> 7 tests ran\n'
'<===Stopping stuff\n')
def testSkips(self):
......
......@@ -31,7 +31,7 @@ class ExecutionProc(base.TestProc):
def __init__(self, ctx, jobs, outproc_factory=None):
super(ExecutionProc, self).__init__()
self.ctx = ctx
self.ctx.pool.init(jobs, notify_fun=self.notify_previous)
self.ctx.pool.init(jobs, notify_function=self.notify_previous)
self._outproc_factory = outproc_factory or (lambda t: t.output_proc)
self._tests = {}
......
......@@ -25,10 +25,11 @@ def print_failure_header(test, is_flaky=False):
class ProgressIndicator():
def __init__(self, options, test_count):
def __init__(self, context, options, test_count):
self.options = None
self.options = options
self._total = test_count
self.context = context
def on_test_result(self, test, result):
pass
......@@ -45,8 +46,8 @@ class ProgressIndicator():
class SimpleProgressIndicator(ProgressIndicator):
def __init__(self, options, test_count):
super(SimpleProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count):
super(SimpleProgressIndicator, self).__init__(context, options, test_count)
self._requirement = base.DROP_PASS_OUTPUT
self._failed = []
......@@ -96,8 +97,8 @@ class SimpleProgressIndicator(ProgressIndicator):
class StreamProgressIndicator(ProgressIndicator):
def __init__(self, options, test_count):
super(StreamProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count):
super(StreamProgressIndicator, self).__init__(context, options, test_count)
self._requirement = base.DROP_PASS_OUTPUT
def on_test_result(self, test, result):
......@@ -120,8 +121,8 @@ class StreamProgressIndicator(ProgressIndicator):
class VerboseProgressIndicator(SimpleProgressIndicator):
def __init__(self, options, test_count):
super(VerboseProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count):
super(VerboseProgressIndicator, self).__init__(context, options, test_count)
self._last_printed_time = time.time()
def _print(self, text):
......@@ -139,10 +140,11 @@ class VerboseProgressIndicator(SimpleProgressIndicator):
# TODO(machenbach): Remove this platform specific hack and implement a proper
# feedback channel from the workers, providing which tests are currently run.
def _print_processes_linux(self):
if platform.system() == 'Linux':
def _print_processes(self):
procs = self.context.list_processes()
if procs:
self._print('List of processes:')
for pid, cmd in util.list_processes_linux():
for pid, cmd in self.context.list_processes():
# Show command with pid, but other process info cut off.
self._print('pid: %d cmd: %s' % (pid, cmd))
......@@ -154,17 +156,17 @@ class VerboseProgressIndicator(SimpleProgressIndicator):
# Print something every 30 seconds to not get killed by an output
# timeout.
self._print('Still working...')
self._print_processes_linux()
self._print_processes()
def on_event(self, event):
self._print(event)
self._print_processes_linux()
self._print_processes()
class CIProgressIndicator(VerboseProgressIndicator):
def on_test_result(self, test, result):
super(VerboseProgressIndicator, self).on_test_result(test, result)
def on_test_result(self, context, test, result):
super(VerboseProgressIndicator, self).on_test_result(context, test, result)
if self.options.ci_test_completion:
with open(self.options.ci_test_completion, "a") as f:
f.write(self._message(test, result) + "\n")
......@@ -182,8 +184,8 @@ class CIProgressIndicator(VerboseProgressIndicator):
class DotsProgressIndicator(SimpleProgressIndicator):
def __init__(self, options, test_count):
super(DotsProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count):
super(DotsProgressIndicator, self).__init__(context, options, test_count)
self._count = 0
def on_test_result(self, test, result):
......@@ -209,8 +211,8 @@ class DotsProgressIndicator(SimpleProgressIndicator):
class CompactProgressIndicator(ProgressIndicator):
def __init__(self, options, test_count, templates):
super(CompactProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count, templates):
super(CompactProgressIndicator, self).__init__(context, options, test_count)
self._requirement = base.DROP_PASS_OUTPUT
self._templates = templates
......@@ -293,7 +295,7 @@ class CompactProgressIndicator(ProgressIndicator):
class ColorProgressIndicator(CompactProgressIndicator):
def __init__(self, options, test_count):
def __init__(self, context, options, test_count):
templates = {
'status_line': ("[%(mins)02i:%(secs)02i|"
"\033[34m%%%(progress) 4d\033[0m|"
......@@ -304,7 +306,8 @@ class ColorProgressIndicator(CompactProgressIndicator):
'failure': "\033[1;31m%s\033[0m",
'command': "\033[33m%s\033[0m",
}
super(ColorProgressIndicator, self).__init__(options, test_count, templates)
super(ColorProgressIndicator, self).__init__(context, options, test_count,
templates)
def printFormatted(self, format, string):
print(self._templates[format] % string)
......@@ -320,13 +323,13 @@ class ColorProgressIndicator(CompactProgressIndicator):
class MonochromeProgressIndicator(CompactProgressIndicator):
def __init__(self, options, test_count):
def __init__(self, context, options, test_count):
templates = {
'status_line': ("[%(mins)02i:%(secs)02i|%%%(progress) 4d|"
"+%(passed) 4d|-%(failed) 4d]: %(test)s"),
}
super(MonochromeProgressIndicator, self).__init__(options, test_count,
templates)
super(MonochromeProgressIndicator, self).__init__(context, options,
test_count, templates)
def printFormatted(self, format, string):
print(string)
......@@ -337,8 +340,9 @@ class MonochromeProgressIndicator(CompactProgressIndicator):
class JsonTestProgressIndicator(ProgressIndicator):
def __init__(self, options, test_count, framework_name):
super(JsonTestProgressIndicator, self).__init__(options, test_count)
def __init__(self, context, options, test_count, framework_name):
super(JsonTestProgressIndicator, self).__init__(context, options,
test_count)
self.tests = util.FixedSizeTopList(
self.options.slow_tests_cutoff, key=lambda rec: rec['duration'])
# We want to drop stdout/err for all passed tests on the first try, but we
......
......@@ -56,12 +56,16 @@ class ResultsTracker(base.TestProcObserver):
class ProgressProc(base.TestProcObserver):
def __init__(self, options, framework_name, test_count):
def __init__(self, context, options, framework_name, test_count):
super(ProgressProc, self).__init__()
self.procs = [PROGRESS_INDICATORS[options.progress](options, test_count)]
self.procs = [
PROGRESS_INDICATORS[options.progress](context, options, test_count)
]
if options.json_test_results:
self.procs.insert(
0, JsonTestProgressIndicator(options, test_count, framework_name))
0,
JsonTestProgressIndicator(context, options, test_count,
framework_name))
self._requirement = max(proc._requirement for proc in self.procs)
......
......@@ -16,8 +16,10 @@ from dataclasses import dataclass
from io import StringIO
from os.path import dirname as up
from testrunner.local.command import BaseCommand, DefaultOSContext
from testrunner.local.command import BaseCommand
from testrunner.objects import output
from testrunner.local.context import DefaultOSContext
from testrunner.local.pool import SingleThreadedExecutionPool
TOOLS_ROOT = up(up(up(os.path.abspath(__file__))))
sys.path.append(TOOLS_ROOT)
......@@ -189,10 +191,11 @@ class TestRunnerTest(unittest.TestCase):
class FakeOSContext(DefaultOSContext):
def __init__(self):
super(FakeOSContext, self).__init__(FakeCommand)
super(FakeOSContext, self).__init__(FakeCommand,
SingleThreadedExecutionPool())
@contextmanager
def context(self, device):
def handle_context(self, options):
print("===>Starting stuff")
yield
print("<===Stopping stuff")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment