Commit 9e5317ac authored by maruel@chromium.org's avatar maruel@chromium.org

Add --jobs support to gclient. --jobs=1 is still the default for now.

Huge thanks to piman@ for working on a patch. I chose a different design but he
gave me motivation and ideas. Sorry for not accepting his patch earlier, this
was mostly due to broken gclient implementation itself.

gclient can now run an unlimited number of parallel checkouts and always keep
the checkout coherency correct.

--jobs=1 is single threaded as before, albeit with a different code path.

Issues:
- Using --jobs with a value other than 1 will result in a mangled output.
- Exceptions thrown in a thread will be have the wrong stack trace.

TEST=gclient sync -j 99 in a ssh:// chromiumos checkout is dramatically faster.

---

Here's the perf on linux on i7-860 for a chromium checkout with warm cache. Cold
cache will result is significantly reduced improvements so this is best case
improvements. The sync was no-op all the time except where noted. All execution
where with "time gclient sync " + args. Didn't include 'sys' column since it was
statistically insignifiant and highly correlated with 'user'.

           runs with -f        runs with -m      without -f nor -m
 args          real    user      real    user      real    user
 -j 12       20.59s  18.00s     5.64s   7.95s     5.86s   8.10s
        #1 1m05.26s  20.02s     5.20s   7.94s     5.10s   8.09s
             22.79s  18.17s
 -j 1   #2 1m47.00s  16.72s     9.69s   5.72s    12.35s   5.96s
           1m31.28s  17.06s     9.54s   5.85s    10.51s   6.20s
           1m31.79s  16.39s
 before #3 1m30.94s  16.74s     9.77s   5.83s    10.45s   5.77s
           1m30.17s  17.30s    10.36s   5.68s    10.16s   5.88s
 hook #4      8.52s  7.93s
              8.73s  8.13s

#1 This particular run synched to r56023, a webkit roll updating layout tests.
   It's still faster than a no-op sync without parallel checkout.
#2 Maybe there was a sync or computer hickup, I didn't realize.
#3 This is depot_tools@56020
#4 Since -f implies runhooks, I ran the hook 'python src/build/gyp_chromium'
   manually to compare. Hooks are still run in a single thread. I didn't rest
   'gclient runhooks'.

I tried to go a ssh:// checkout of chromium os tree but it timed out everytime I
tried to sync so I couldn't get data points. I expect an order of magnitude of
improvement or more.

Review URL: http://codereview.chromium.org/3135014

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@56079 0039d316-1c4b-4281-b951-d872f2087c98
parent 80cbe8b7
......@@ -49,8 +49,9 @@ Hooks
]
"""
__version__ = "0.5.2"
__version__ = "0.6"
import copy
import logging
import optparse
import os
......@@ -355,12 +356,13 @@ class Dependency(GClientKeywords, gclient_utils.WorkItem):
args + [self.parsed_url.GetFilename()],
self._file_list)
else:
# Create a shallow copy to mutate revision.
options = copy.copy(options)
options.revision = revision_overrides.get(self.name)
scm = gclient_scm.CreateSCM(self.parsed_url, self.root_dir(), self.name)
scm.RunCommand(command, options, args, self._file_list)
self._file_list = [os.path.join(self.name, f.strip())
for f in self._file_list]
options.revision = None
self.processed = True
if self.recursion_limit() > 0:
# Then we can parse the DEPS file.
......@@ -710,7 +712,7 @@ solutions = [
pm = None
if command == 'update' and not self._options.verbose:
pm = Progress('Syncing projects', 1)
work_queue = gclient_utils.ExecutionQueue(pm)
work_queue = gclient_utils.ExecutionQueue(self._options.jobs, pm)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, revision_overrides, command, args,
......@@ -755,7 +757,7 @@ solutions = [
if not self.dependencies:
raise gclient_utils.Error('No solution specified')
# Load all the settings.
work_queue = gclient_utils.ExecutionQueue(None)
work_queue = gclient_utils.ExecutionQueue(self._options.jobs, None)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, {}, None, [], work_queue)
......@@ -1168,6 +1170,8 @@ def Main(argv):
' %-10s %s' % (fn[3:], Command(fn[3:]).__doc__.split('\n')[0].strip())
for fn in dir(sys.modules[__name__]) if fn.startswith('CMD')]))
parser = optparse.OptionParser(version='%prog ' + __version__)
parser.add_option('-j', '--jobs', default=1, type='int',
help='Specify how many SCM commands can run in parallel')
parser.add_option('-v', '--verbose', action='count', default=0,
help='Produces additional output for diagnostics. Can be '
'used up to three times for more logging info.')
......@@ -1186,6 +1190,8 @@ def Main(argv):
logging.basicConfig(level=level,
format='%(module)s(%(lineno)d) %(funcName)s:%(message)s')
options.entries_filename = options.config_filename + '_entries'
if options.jobs < 1:
parser.error('--jobs must be 1 or higher')
# These hacks need to die.
if not hasattr(options, 'revisions'):
......
......@@ -21,6 +21,7 @@ import re
import stat
import subprocess
import sys
import threading
import time
import threading
import xml.dom.minidom
......@@ -378,21 +379,30 @@ class WorkItem(object):
class ExecutionQueue(object):
"""Dependencies sometime needs to be run out of order due to From() keyword.
"""Runs a set of WorkItem that have interdependencies and were WorkItem are
added as they are processed.
This class manages that all the required dependencies are run before running
each one.
In gclient's case, Dependencies sometime needs to be run out of order due to
From() keyword. This class manages that all the required dependencies are run
before running each one.
Methods of this class are multithread safe.
Methods of this class are thread safe.
"""
def __init__(self, progress):
self.lock = threading.Lock()
# List of WorkItem, Dependency inherits from WorkItem.
def __init__(self, jobs, progress):
"""jobs specifies the number of concurrent tasks to allow. progress is a
Progress instance."""
# Set when a thread is done or a new item is enqueued.
self.ready_cond = threading.Condition()
# Maximum number of concurrent tasks.
self.jobs = jobs
# List of WorkItem, for gclient, these are Dependency instances.
self.queued = []
# List of strings representing each Dependency.name that was run.
self.ran = []
# List of items currently running.
self.running = []
# Exceptions thrown if any.
self.exceptions = []
self.progress = progress
if self.progress:
self.progress.update()
......@@ -402,71 +412,99 @@ class ExecutionQueue(object):
satisfied.
"""
assert isinstance(d, WorkItem)
self.ready_cond.acquire()
try:
self.lock.acquire()
self.queued.append(d)
total = len(self.queued) + len(self.ran) + len(self.running)
logging.debug('enqueued(%s)' % d.name)
if self.progress:
self.progress._total = total + 1
self.progress.update(0)
self.ready_cond.notifyAll()
finally:
self.lock.release()
if self.progress:
self.progress._total = total + 1
self.progress.update(0)
self.ready_cond.release()
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
while self._run_one_item(*args, **kwargs):
pass
queued = []
running = []
self.ready_cond.acquire()
try:
self.lock.acquire()
if self.queued:
queued = self.queued
self.queued = []
if self.running:
running = self.running
self.running = []
while True:
# Check for task to run first, then wait.
while True:
if self.exceptions:
# Systematically flush the queue when there is an exception logged
# in.
self.queued = []
# Flush threads that have terminated.
self.running = [t for t in self.running if t.isAlive()]
if not self.queued and not self.running:
break
if self.jobs == len(self.running):
break
for i in xrange(len(self.queued)):
# Verify its requirements.
for r in self.queued[i].requirements:
if not r in self.ran:
# Requirement not met.
break
else:
# Start one work item: all its requirements are satisfied.
d = self.queued.pop(i)
new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
if self.jobs > 1:
# Start the thread.
self.running.append(new_thread)
new_thread.start()
else:
# Run the 'thread' inside the main thread.
new_thread.run()
break
else:
# Couldn't find an item that could run. Break out the outher loop.
break
if not self.queued and not self.running:
break
# We need to poll here otherwise Ctrl-C isn't processed.
self.ready_cond.wait(10)
# Something happened: self.enqueue() or a thread terminated. Loop again.
finally:
self.lock.release()
self.ready_cond.release()
assert not self.running, 'Now guaranteed to be single-threaded'
if self.exceptions:
# TODO(maruel): Get back the original stack location.
raise self.exceptions.pop(0)
if self.progress:
self.progress.end()
if queued:
raise gclient_utils.Error('Entries still queued: %s' % str(queued))
if running:
raise gclient_utils.Error('Entries still queued: %s' % str(running))
def _run_one_item(self, *args, **kwargs):
"""Removes one item from the queue that has all its requirements completed
and execute it.
class _Worker(threading.Thread):
"""One thread to execute one WorkItem."""
def __init__(self, parent, item, args=(), kwargs=None):
threading.Thread.__init__(self, name=item.name or 'Worker')
self.args = args
self.kwargs = kwargs or {}
self.item = item
self.parent = parent
def run(self):
"""Runs in its own thread."""
logging.debug('running(%s)' % self.item.name)
exception = None
try:
self.item.run(*self.args, **self.kwargs)
except Exception, e:
# TODO(maruel): Catch exception location.
exception = e
Returns False if no item could be run.
"""
i = 0
d = None
try:
self.lock.acquire()
while i != len(self.queued) and not d:
d = self.queued.pop(i)
for r in d.requirements:
if not r in self.ran:
self.queued.insert(i, d)
d = None
break
i += 1
if not d:
return False
self.running.append(d)
finally:
self.lock.release()
d.run(*args, **kwargs)
try:
self.lock.acquire()
assert not d.name in self.ran
if not d.name in self.ran:
self.ran.append(d.name)
self.running.remove(d)
if self.progress:
self.progress.update(1)
finally:
self.lock.release()
return True
# This assumes the following code won't throw an exception. Bad.
self.parent.ready_cond.acquire()
try:
if exception:
self.parent.exceptions.append(exception)
if self.parent.progress:
self.parent.progress.update(1)
assert not self.item.name in self.parent.ran
if not self.item.name in self.parent.ran:
self.parent.ran.append(self.item.name)
finally:
self.parent.ready_cond.notifyAll()
self.parent.ready_cond.release()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment