pool.py 6.3 KB
Newer Older
1 2 3 4 5
#!/usr/bin/env python
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

6
from Queue import Empty
7
from multiprocessing import Event, Process, Queue
8 9
import traceback

10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

class NormalResult():
  def __init__(self, result):
    self.result = result
    self.exception = False
    self.break_now = False


class ExceptionResult():
  def __init__(self):
    self.exception = True
    self.break_now = False


class BreakResult():
  def __init__(self):
    self.exception = False
    self.break_now = True


30 31 32 33 34 35 36 37 38 39 40 41 42 43
class MaybeResult():
  def __init__(self, heartbeat, value):
    self.heartbeat = heartbeat
    self.value = value

  @staticmethod
  def create_heartbeat():
    return MaybeResult(True, None)

  @staticmethod
  def create_result(value):
    return MaybeResult(False, value)


44 45
def Worker(fn, work_queue, done_queue, done,
           process_context_fn=None, process_context_args=None):
46 47 48 49
  """Worker to be run in a child process.
  The worker stops on two conditions. 1. When the poison pill "STOP" is
  reached or 2. when the event "done" is set."""
  try:
50 51 52
    kwargs = {}
    if process_context_fn and process_context_args is not None:
      kwargs.update(process_context=process_context_fn(*process_context_args))
53 54 55 56
    for args in iter(work_queue.get, "STOP"):
      if done.is_set():
        break
      try:
57
        done_queue.put(NormalResult(fn(*args, **kwargs)))
58
      except Exception, e:
59
        traceback.print_exc()
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
        print(">>> EXCEPTION: %s" % e)
        done_queue.put(ExceptionResult())
  except KeyboardInterrupt:
    done_queue.put(BreakResult())


class Pool():
  """Distributes tasks to a number of worker processes.
  New tasks can be added dynamically even after the workers have been started.
  Requirement: Tasks can only be added from the parent process, e.g. while
  consuming the results generator."""

  # Factor to calculate the maximum number of items in the work/done queue.
  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
  BUFFER_FACTOR = 4

76
  def __init__(self, num_workers, heartbeat_timeout=30):
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    self.num_workers = num_workers
    self.processes = []
    self.terminated = False

    # Invariant: count >= #work_queue + #done_queue. It is greater when a
    # worker takes an item from the work_queue and before the result is
    # submitted to the done_queue. It is equal when no worker is working,
    # e.g. when all workers have finished, and when no results are processed.
    # Count is only accessed by the parent process. Only the parent process is
    # allowed to remove items from the done_queue and to add items to the
    # work_queue.
    self.count = 0
    self.work_queue = Queue()
    self.done_queue = Queue()
    self.done = Event()
92
    self.heartbeat_timeout = heartbeat_timeout
93

94 95
  def imap_unordered(self, fn, gen,
                     process_context_fn=None, process_context_args=None):
96 97
    """Maps function "fn" to items in generator "gen" on the worker processes
    in an arbitrary order. The items are expected to be lists of arguments to
98 99 100
    the function. Returns a results iterator. A result value of type
    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
    that the runner is still waiting for the result to be computed, or it wraps
101 102 103 104 105 106 107 108 109 110
    the real result.

    Args:
      process_context_fn: Function executed once by each worker. Expected to
          return a process-context object. If present, this object is passed
          as additional argument to each call to fn.
      process_context_args: List of arguments for the invocation of
          process_context_fn. All arguments will be pickled and sent beyond the
          process boundary.
    """
111
    try:
112
      internal_error = False
113 114 115 116 117 118 119
      gen = iter(gen)
      self.advance = self._advance_more

      for w in xrange(self.num_workers):
        p = Process(target=Worker, args=(fn,
                                         self.work_queue,
                                         self.done_queue,
120 121 122
                                         self.done,
                                         process_context_fn,
                                         process_context_args))
123 124 125 126 127
        self.processes.append(p)
        p.start()

      self.advance(gen)
      while self.count > 0:
128 129 130 131 132 133 134 135
        while True:
          try:
            result = self.done_queue.get(timeout=self.heartbeat_timeout)
            break
          except Empty:
            # Indicate a heartbeat. The iterator will continue fetching the
            # next result.
            yield MaybeResult.create_heartbeat()
136 137
        self.count -= 1
        if result.exception:
138 139 140
          # TODO(machenbach): Handle a few known types of internal errors
          # gracefully, e.g. missing test files.
          internal_error = True
141 142 143 144 145
          continue
        elif result.break_now:
          # A keyboard interrupt happened in one of the worker processes.
          raise KeyboardInterrupt
        else:
146
          yield MaybeResult.create_result(result.result)
147 148 149
        self.advance(gen)
    finally:
      self.terminate()
150 151
    if internal_error:
      raise Exception("Internal error in a worker process.")
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

  def _advance_more(self, gen):
    while self.count < self.num_workers * self.BUFFER_FACTOR:
      try:
        self.work_queue.put(gen.next())
        self.count += 1
      except StopIteration:
        self.advance = self._advance_empty
        break

  def _advance_empty(self, gen):
    pass

  def add(self, args):
    """Adds an item to the work queue. Can be called dynamically while
    processing the results from imap_unordered."""
    self.work_queue.put(args)
    self.count += 1

  def terminate(self):
    if self.terminated:
      return
    self.terminated = True

    # For exceptional tear down set the "done" event to stop the workers before
    # they empty the queue buffer.
    self.done.set()

    for p in self.processes:
      # During normal tear down the workers block on get(). Feed a poison pill
      # per worker to make them stop.
      self.work_queue.put("STOP")

    for p in self.processes:
      p.join()
187 188 189 190 191 192 193 194 195 196

    # Drain the queues to prevent failures when queues are garbage collected.
    try:
      while True: self.work_queue.get(False)
    except:
      pass
    try:
      while True: self.done_queue.get(False)
    except:
      pass