base.py 6.2 KB
Newer Older
1 2 3 4
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

5 6
from .result import SKIPPED

7

8 9 10 11 12 13 14 15 16
"""
Pipeline

Test processors are chained together and communicate with each other by
calling previous/next processor in the chain.
     ----next_test()---->     ----next_test()---->
Proc1                    Proc2                    Proc3
     <---result_for()----     <---result_for()----

17 18 19 20 21 22
For every next_test there is exactly one result_for call.
If processor ignores the test it has to return SkippedResult.
If it created multiple subtests for one test and wants to pass all of them to
the previous processor it can enclose them in GroupedResult.


23 24 25 26 27 28 29 30 31 32 33 34
Subtests

When test processor needs to modify the test or create some variants of the
test it creates subtests and sends them to the next processor.
Each subtest has:
- procid - globally unique id that should contain id of the parent test and
          some suffix given by test processor, e.g. its name + subtest type.
- processor - which created it
- origin - pointer to the parent (sub)test
"""


35 36 37 38 39 40
DROP_RESULT = 0
DROP_OUTPUT = 1
DROP_PASS_OUTPUT = 2
DROP_PASS_STDOUT = 3


41 42 43 44
class TestProc(object):
  def __init__(self):
    self._prev_proc = None
    self._next_proc = None
45
    self._stopped = False
46 47 48
    self._requirement = DROP_RESULT
    self._prev_requirement = None
    self._reduce_result = lambda result: result
49 50

  def connect_to(self, next_proc):
51
    """Puts `next_proc` after itself in the chain."""
52 53 54
    next_proc._prev_proc = self
    self._next_proc = next_proc

55 56 57 58 59 60
  def remove_from_chain(self):
    if self._prev_proc:
      self._prev_proc._next_proc = self._next_proc
    if self._next_proc:
      self._next_proc._prev_proc = self._prev_proc

61 62 63 64 65 66 67 68
  def setup(self, requirement=DROP_RESULT):
    """
    Method called by previous processor or processor pipeline creator to let
    the processors know what part of the result can be ignored.
    """
    self._prev_requirement = requirement
    if self._next_proc:
      self._next_proc.setup(max(requirement, self._requirement))
69 70 71 72 73 74 75 76

    # Since we're not winning anything by droping part of the result we are
    # dropping the whole result or pass it as it is. The real reduction happens
    # during result creation (in the output processor), so the result is
    # immutable.
    if (self._prev_requirement < self._requirement and
        self._prev_requirement == DROP_RESULT):
      self._reduce_result = lambda _: None
77

78
  def next_test(self, test):
79 80 81
    """
    Method called by previous processor whenever it produces new test.
    This method shouldn't be called by anyone except previous processor.
82 83
    Returns a boolean value to signal whether the test was loaded into the
    execution queue successfully or not.
84
    """
85 86
    raise NotImplementedError()

87
  def result_for(self, test, result):
88 89 90 91
    """
    Method called by next processor whenever it has result for some test.
    This method shouldn't be called by anyone except next processor.
    """
92 93
    raise NotImplementedError()

94 95 96 97
  def heartbeat(self):
    if self._prev_proc:
      self._prev_proc.heartbeat()

98
  def stop(self):
99 100 101 102 103 104
    if not self._stopped:
      self._stopped = True
      if self._prev_proc:
        self._prev_proc.stop()
      if self._next_proc:
        self._next_proc.stop()
105 106 107 108 109

  @property
  def is_stopped(self):
    return self._stopped

110
  ### Communication
111

112 113 114 115 116 117 118 119 120 121 122 123 124
  def notify_previous(self, event):
    self._on_event(event)
    if self._prev_proc:
      self._prev_proc.notify_previous(event)

  def _on_event(self, event):
    """Called when processors to the right signal events, e.g. termination.

    Args:
      event: A text describing the signalled event.
    """
    pass

125
  def _send_test(self, test):
126
    """Helper method for sending test to the next processor."""
127
    return self._next_proc.next_test(test)
128

129
  def _send_result(self, test, result):
130
    """Helper method for sending result to the previous processor."""
131 132
    if not test.keep_output:
      result = self._reduce_result(result)
133
    self._prev_proc.result_for(test, result)
134 135 136


class TestProcObserver(TestProc):
137
  """Processor used for observing the data."""
138 139
  def __init__(self):
    super(TestProcObserver, self).__init__()
140

141 142
  def next_test(self, test):
    self._on_next_test(test)
143
    return self._send_test(test)
144

145 146 147
  def result_for(self, test, result):
    self._on_result_for(test, result)
    self._send_result(test, result)
148

149 150 151 152
  def heartbeat(self):
    self._on_heartbeat()
    super(TestProcObserver, self).heartbeat()

153
  def _on_next_test(self, test):
154 155
    """Method called after receiving test from previous processor but before
    sending it to the next one."""
156 157
    pass

158
  def _on_result_for(self, test, result):
159 160
    """Method called after receiving result from next processor but before
    sending it to the previous one."""
161 162
    pass

163 164 165
  def _on_heartbeat(self):
    pass

166 167

class TestProcProducer(TestProc):
168 169
  """Processor for creating subtests."""

170 171 172 173 174
  def __init__(self, name):
    super(TestProcProducer, self).__init__()
    self._name = name

  def next_test(self, test):
175
    return self._next_test(test)
176

177 178
  def result_for(self, subtest, result):
    self._result_for(subtest.origin, subtest, result)
179 180 181 182 183

  ### Implementation
  def _next_test(self, test):
    raise NotImplementedError()

184
  def _result_for(self, test, subtest, result):
185 186 187 188 189 190
    """
    result_for method extended with `subtest` parameter.

    Args
      test: test used by current processor to create the subtest.
      subtest: test for which the `result` is.
191
      result: subtest execution result created by the output processor.
192
    """
193 194 195
    raise NotImplementedError()

  ### Managing subtests
196
  def _create_subtest(self, test, subtest_id, **kwargs):
197
    """Creates subtest with subtest id <processor name>-`subtest_id`."""
198 199
    return test.create_subtest(self, '%s-%s' % (self._name, subtest_id),
                               **kwargs)
200

201 202 203 204 205

class TestProcFilter(TestProc):
  """Processor for filtering tests."""

  def next_test(self, test):
206
    if self._filter(test):
207 208 209
      return False

    return self._send_test(test)
210

211 212
  def result_for(self, test, result):
    self._send_result(test, result)
213 214 215 216

  def _filter(self, test):
    """Returns whether test should be filtered out."""
    raise NotImplementedError()