Commit 3742c845 authored by maruel@chromium.org's avatar maruel@chromium.org

Add the infrastructure necessary to support annotated stdout.

Simplify ExecutionQueue.run() by moving code into utility functions.

Reduce the amount of code in WorkerThread.run() to improve reliability.

Don't trap exceptions in single-threaded usage!

BUG=54084
TEST=none

Review URL: http://codereview.chromium.org/3336015

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@58974 0039d316-1c4b-4281-b951-d872f2087c98
parent 24b74202
......@@ -334,7 +334,7 @@ class Dependency(GClientKeywords, gclient_utils.WorkItem):
None, should_process))
logging.debug('Loaded: %s' % str(self))
def run(self, options, revision_overrides, command, args, work_queue):
def run(self, revision_overrides, command, args, work_queue, options):
"""Runs 'command' before parsing the DEPS in case it's a initial checkout
or a revert."""
assert self._file_list == []
......@@ -720,8 +720,7 @@ solutions = [
work_queue = gclient_utils.ExecutionQueue(self._options.jobs, pm)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, revision_overrides, command, args,
work_queue)
work_queue.flush(revision_overrides, command, args, options=self._options)
# Once all the dependencies have been processed, it's now safe to run the
# hooks.
......@@ -765,7 +764,7 @@ solutions = [
work_queue = gclient_utils.ExecutionQueue(self._options.jobs, None)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, {}, None, [], work_queue)
work_queue.flush({}, None, [], options=self._options)
def GetURLAndRev(dep):
"""Returns the revision-qualified SCM url for a Dependency."""
......
......@@ -14,9 +14,11 @@
"""Generic utils."""
import copy
import errno
import logging
import os
import Queue
import re
import stat
import subprocess
......@@ -400,7 +402,7 @@ def FindGclientRoot(from_dir, filename='.gclient'):
return path
path_to_check = os.path.dirname(path_to_check)
return None
logging.info('Found gclient root at ' + path)
return path
......@@ -455,7 +457,9 @@ class WorkItem(object):
# A unique string representing this work item.
name = None
def run(self):
def run(self, work_queue, options):
"""work_queue and options are passed as keyword arguments so they should be
the last parameters of the function when you override it."""
pass
......@@ -483,10 +487,11 @@ class ExecutionQueue(object):
# List of items currently running.
self.running = []
# Exceptions thrown if any.
self.exceptions = []
self.exceptions = Queue.Queue()
# Progress status
self.progress = progress
if self.progress:
self.progress.update()
self.progress.update(0)
def enqueue(self, d):
"""Enqueue one Dependency to be executed later once its requirements are
......@@ -507,21 +512,22 @@ class ExecutionQueue(object):
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
kwargs['work_queue'] = self
self.ready_cond.acquire()
try:
while True:
# Check for task to run first, then wait.
while True:
if self.exceptions:
# Systematically flush the queue when there is an exception logged
# in.
if not self.exceptions.empty():
# Systematically flush the queue when an exception logged.
self.queued = []
# Flush threads that have terminated.
self.running = [t for t in self.running if t.isAlive()]
if not self.queued and not self.running:
break
if self.jobs == len(self.running):
self._flush_terminated_threads()
if (not self.queued and not self.running or
self.jobs == len(self.running)):
# No more worker threads or can't queue anything.
break
# Check for new tasks to start.
for i in xrange(len(self.queued)):
# Verify its requirements.
for r in self.queued[i].requirements:
......@@ -530,64 +536,88 @@ class ExecutionQueue(object):
break
else:
# Start one work item: all its requirements are satisfied.
d = self.queued.pop(i)
new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
if self.jobs > 1:
# Start the thread.
self.running.append(new_thread)
new_thread.start()
else:
# Run the 'thread' inside the main thread.
new_thread.run()
self._run_one_task(self.queued.pop(i), args, kwargs)
break
else:
# Couldn't find an item that could run. Break out the outher loop.
break
if not self.queued and not self.running:
# We're done.
break
# We need to poll here otherwise Ctrl-C isn't processed.
self.ready_cond.wait(10)
# Something happened: self.enqueue() or a thread terminated. Loop again.
finally:
self.ready_cond.release()
assert not self.running, 'Now guaranteed to be single-threaded'
if self.exceptions:
if not self.exceptions.empty():
# To get back the stack location correctly, the raise a, b, c form must be
# used, passing a tuple as the first argument doesn't work.
e = self.exceptions.pop(0)
e = self.exceptions.get()
raise e[0], e[1], e[2]
if self.progress:
self.progress.end()
def _flush_terminated_threads(self):
"""Flush threads that have terminated."""
running = self.running
self.running = []
for t in running:
if t.isAlive():
self.running.append(t)
else:
t.join()
t.kwargs['options'].stdout.flush()
if self.progress:
self.progress.update(1)
assert not t.name in self.ran
if not t.name in self.ran:
self.ran.append(t.name)
def _run_one_task(self, task_item, args, kwargs):
if self.jobs > 1:
# Start the thread.
index = len(self.ran) + len(self.running) + 1
# Copy 'options' just to be safe.
task_kwargs = kwargs.copy()
task_kwargs['options'] = copy.copy(task_kwargs['options'])
new_thread = self._Worker(task_item, args, task_kwargs)
self.running.append(new_thread)
new_thread.start()
else:
# Run the 'thread' inside the main thread. Don't try to catch any
# exception.
task_item.run(*args, **kwargs)
self.ran.append(task_item.name)
if self.progress:
self.progress.update(1)
class _Worker(threading.Thread):
"""One thread to execute one WorkItem."""
def __init__(self, parent, item, args=(), kwargs=None):
def __init__(self, item, args, kwargs):
threading.Thread.__init__(self, name=item.name or 'Worker')
self.args = args
self.kwargs = kwargs or {}
logging.info(item.name)
self.item = item
self.parent = parent
self.args = args
self.kwargs = kwargs
def run(self):
"""Runs in its own thread."""
logging.debug('running(%s)' % self.item.name)
exception = None
work_queue = self.kwargs['work_queue']
try:
self.item.run(*self.args, **self.kwargs)
except Exception:
# Catch exception location.
exception = sys.exc_info()
logging.info('Caught exception in thread %s' % self.item.name)
logging.info(str(sys.exc_info()))
work_queue.exceptions.put(sys.exc_info())
logging.info('Task %s done' % self.item.name)
# This assumes the following code won't throw an exception. Bad.
self.parent.ready_cond.acquire()
work_queue.ready_cond.acquire()
try:
if exception:
self.parent.exceptions.append(exception)
if self.parent.progress:
self.parent.progress.update(1)
assert not self.item.name in self.parent.ran
if not self.item.name in self.parent.ran:
self.parent.ran.append(self.item.name)
work_queue.ready_cond.notifyAll()
finally:
self.parent.ready_cond.notifyAll()
self.parent.ready_cond.release()
work_queue.ready_cond.release()
......@@ -29,8 +29,8 @@ class GclientUtilsUnittest(GclientUtilBase):
'GetNodeNamedAttributeText', 'PathDifference', 'ParseXML', 'Popen',
'PrintableObject', 'RemoveDirectory', 'SplitUrlRevision',
'StdoutAutoFlush', 'SyntaxErrorToError', 'WorkItem',
'errno', 'logging', 'os', 're', 'stat', 'subprocess', 'sys',
'threading', 'time', 'xml',
'copy', 'errno', 'logging', 'os', 'Queue', 're', 'stat', 'subprocess',
'sys','threading', 'time', 'xml',
]
# If this test fails, you should add the relevant test.
self.compareMembers(gclient_utils, members)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment