Commit 8b5086d8 authored by piman@chromium.org's avatar piman@chromium.org

Add -j option to gclient to run parallel updates

On a chromeos checkout, -j 10 brings down null sync time from 2 minutes to 16 seconds
Currently -j may break some assumptions about ordering of commands (which may be fine for some clients, but not all), so it's not on by default.

Review URL: http://codereview.chromium.org/1640001

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@44869 0039d316-1c4b-4281-b951-d872f2087c98
parent f43d019e
...@@ -58,6 +58,7 @@ Hooks ...@@ -58,6 +58,7 @@ Hooks
__author__ = "darinf@gmail.com (Darin Fisher)" __author__ = "darinf@gmail.com (Darin Fisher)"
__version__ = "0.3.4" __version__ = "0.3.4"
import copy
import errno import errno
import logging import logging
import optparse import optparse
...@@ -698,6 +699,17 @@ class GClient(object): ...@@ -698,6 +699,17 @@ class GClient(object):
if matching_file_list: if matching_file_list:
self._RunHookAction(hook_dict, matching_file_list) self._RunHookAction(hook_dict, matching_file_list)
def GetSCMCommandClosure(self, path, url, revision, command, args, file_list):
"""Gets a closure that runs a SCM command on a particular dependency."""
def _Closure():
logging.debug("Running %s in %s to %s %s" % (command, path, url,
revision))
options = copy.copy(self._options)
options.revision = revision
scm = gclient_scm.CreateSCM(url, self._root_dir, path)
scm.RunCommand(command, options, args, file_list)
return _Closure
def RunOnDeps(self, command, args): def RunOnDeps(self, command, args):
"""Runs a command on each dependency in a client and its dependencies. """Runs a command on each dependency in a client and its dependencies.
...@@ -738,84 +750,125 @@ class GClient(object): ...@@ -738,84 +750,125 @@ class GClient(object):
entries = {} entries = {}
entries_deps_content = {} entries_deps_content = {}
file_list = []
# Run on the base solutions first.
for solution in solutions:
name = solution["name"]
deps_file = solution.get("deps_file", self._options.deps_file)
if '/' in deps_file or '\\' in deps_file:
raise gclient_utils.Error('deps_file name must not be a path, just a '
'filename.')
if name in entries:
raise gclient_utils.Error("solution %s specified more than once" % name)
url = solution["url"]
entries[name] = url
if run_scm and url:
self._options.revision = revision_overrides.get(name)
scm = gclient_scm.CreateSCM(url, self._root_dir, name)
scm.RunCommand(command, self._options, args, file_list)
file_list = [os.path.join(name, f.strip()) for f in file_list]
self._options.revision = None
try:
deps_content = gclient_utils.FileRead(
os.path.join(self._root_dir, name, deps_file))
except IOError, e:
if e.errno != errno.ENOENT:
raise
deps_content = ""
entries_deps_content[name] = deps_content
# Process the dependencies next (sort alphanumerically to ensure that # To avoid threading issues, all file lists get constructed separately then
# containing directories get populated first and for readability) # gathered in a flattened list at the end.
deps = self._ParseAllDeps(entries, entries_deps_content) file_list_list = []
deps_to_process = deps.keys() file_list_dict = {}
deps_to_process.sort()
# First pass for direct dependencies. thread_pool = gclient_utils.ThreadPool(self._options.jobs)
if command == 'update' and not self._options.verbose: thread_pool.Start()
pm = Progress('Syncing projects', len(deps_to_process))
for d in deps_to_process: try:
# Run on the base solutions first.
for solution in solutions:
name = solution["name"]
deps_file = solution.get("deps_file", self._options.deps_file)
if '/' in deps_file or '\\' in deps_file:
raise gclient_utils.Error('deps_file name must not be a path, just a '
'filename.')
if name in entries:
raise gclient_utils.Error(
"solution %s specified more than once" % name)
url = solution["url"]
entries[name] = url
if run_scm and url:
revision = revision_overrides.get(name)
file_list = []
file_list_dict[name] = file_list
thread_pool.AddJob(self.GetSCMCommandClosure(
name, url, revision, command, args, file_list))
thread_pool.WaitJobs()
for solution in solutions:
name = solution["name"]
deps_file = solution.get("deps_file", self._options.deps_file)
try:
deps_content = gclient_utils.FileRead(
os.path.join(self._root_dir, name, deps_file))
except IOError, e:
if e.errno != errno.ENOENT:
raise
deps_content = ""
entries_deps_content[name] = deps_content
try:
file_list_list.append([os.path.join(name, f.strip())
for f in file_list_dict[name]])
except KeyError:
# We may not have added the file list to the dict, see tests above.
# Instead of duplicating the tests, it's less fragile to just ignore
# the exception.
pass
# Process the dependencies next (sort alphanumerically to ensure that
# containing directories get populated first and for readability)
# TODO(piman): when using multiple threads, the ordering is not ensured.
# In many cases (e.g. updates to an existing checkout where DEPS don't
# move between directories), it'll still be correct but for completeness
# this should be fixed.
deps = self._ParseAllDeps(entries, entries_deps_content)
deps_to_process = deps.keys()
deps_to_process.sort()
# First pass for direct dependencies.
if command == 'update' and not self._options.verbose: if command == 'update' and not self._options.verbose:
pm.update() pm = Progress('Syncing projects', len(deps_to_process))
if type(deps[d]) == str:
url = deps[d] for d in deps_to_process:
entries[d] = url if command == 'update' and not self._options.verbose:
if run_scm: pm.update()
self._options.revision = revision_overrides.get(d) file_list = []
scm = gclient_scm.CreateSCM(url, self._root_dir, d) file_list_list.append(file_list)
scm.RunCommand(command, self._options, args, file_list) if type(deps[d]) == str:
self._options.revision = None url = deps[d]
elif isinstance(deps[d], self.FileImpl): entries[d] = url
file = deps[d] if run_scm:
self._options.revision = file.GetRevision() revision = revision_overrides.get(d)
if run_scm: thread_pool.AddJob(self.GetSCMCommandClosure(d, url, revision,
scm = gclient_scm.CreateSCM(file.GetPath(), self._root_dir, d) command, args,
scm.RunCommand("updatesingle", self._options, file_list))
args + [file.GetFilename()], file_list) elif isinstance(deps[d], self.FileImpl):
file = deps[d]
if run_scm:
revision = file.GetRevision()
thread_pool.AddJob(self.GetSCMCommandClosure(
d, url, revision, "updatesingle", args + [file.GetFilename()],
file_list))
thread_pool.WaitJobs()
if command == 'update' and not self._options.verbose: if command == 'update' and not self._options.verbose:
pm.end() pm.end()
# Second pass for inherited deps (via the From keyword)
for d in deps_to_process:
if isinstance(deps[d], self.FromImpl):
filename = os.path.join(self._root_dir,
deps[d].module_name,
self._options.deps_file)
content = gclient_utils.FileRead(filename)
sub_deps = self._ParseSolutionDeps(deps[d].module_name, content, {},
False)
# Getting the URL from the sub_deps file can involve having to resolve
# a File() or having to resolve a relative URL. To resolve relative
# URLs, we need to pass in the orignal sub deps URL.
sub_deps_base_url = deps[deps[d].module_name]
url = deps[d].GetUrl(d, sub_deps_base_url, self._root_dir, sub_deps)
entries[d] = url
if run_scm:
revision = revision_overrides.get(d)
file_list = []
file_list_list.append(file_list)
thread_pool.AddJob(self.GetSCMCommandClosure(d, url, revision,
command, args,
file_list))
# Second pass for inherited deps (via the From keyword) thread_pool.WaitJobs()
for d in deps_to_process: finally:
if isinstance(deps[d], self.FromImpl): thread_pool.Stop()
filename = os.path.join(self._root_dir,
deps[d].module_name, file_list = sum(file_list_list, [])
self._options.deps_file)
content = gclient_utils.FileRead(filename)
sub_deps = self._ParseSolutionDeps(deps[d].module_name, content, {},
False)
# Getting the URL from the sub_deps file can involve having to resolve
# a File() or having to resolve a relative URL. To resolve relative
# URLs, we need to pass in the orignal sub deps URL.
sub_deps_base_url = deps[deps[d].module_name]
url = deps[d].GetUrl(d, sub_deps_base_url, self._root_dir, sub_deps)
entries[d] = url
if run_scm:
self._options.revision = revision_overrides.get(d)
scm = gclient_scm.CreateSCM(url, self._root_dir, d)
scm.RunCommand(command, self._options, args, file_list)
self._options.revision = None
# Convert all absolute paths to relative. # Convert all absolute paths to relative.
for i in range(len(file_list)): for i in range(len(file_list)):
...@@ -1284,6 +1337,9 @@ def Main(argv): ...@@ -1284,6 +1337,9 @@ def Main(argv):
option_parser.add_option("", "--gclientfile", default=None, option_parser.add_option("", "--gclientfile", default=None,
metavar="FILENAME", metavar="FILENAME",
help=("specify an alternate .gclient file")) help=("specify an alternate .gclient file"))
option_parser.add_option("-j", "--jobs", default=1, type="int",
help=("specify how many SCM commands can run in "
"parallel"))
if len(argv) < 2: if len(argv) < 2:
# Users don't need to be told to use the 'help' command. # Users don't need to be told to use the 'help' command.
......
...@@ -17,11 +17,14 @@ ...@@ -17,11 +17,14 @@
import errno import errno
import logging import logging
import os import os
import Queue
import re import re
import stat import stat
import subprocess import subprocess
import sys import sys
import time import time
import threading
import traceback
import xml.dom.minidom import xml.dom.minidom
import xml.parsers.expat import xml.parsers.expat
...@@ -353,3 +356,80 @@ def GetGClientRootAndEntries(path=None): ...@@ -353,3 +356,80 @@ def GetGClientRootAndEntries(path=None):
execfile(config_path, env) execfile(config_path, env)
config_dir = os.path.dirname(config_path) config_dir = os.path.dirname(config_path)
return config_dir, env['entries'] return config_dir, env['entries']
class ThreadPool:
"""A thread pool class that lets one schedule jobs on many worker threads."""
def __init__(self, threads=1):
self._threads = threads
self._queue = Queue.Queue()
self._workers = []
class Worker(threading.Thread):
"""Internal worker class that executes jobs from the ThreadPool queue."""
def __init__(self, pool):
threading.Thread.__init__(self)
self._pool = pool
self._done = False
self.exceptions = []
def Done(self):
"""Terminates the worker threads."""
self._done = True
def run(self):
"""Executes jobs from the pool's queue."""
while not self._done:
f = self._pool._queue.get()
try:
f(self)
except Exception, e:
# Catch all exceptions, otherwise we can't join the thread. Print the
# backtrace now, but keep the exception so that we can raise it on the
# main thread.
type, value, tb = sys.exc_info()
traceback.print_exception(type, value, tb)
self.exceptions.append(e)
finally:
self._pool._queue.task_done()
def Start(self):
"""Starts the thread pool. Spawns worker threads."""
assert not self._workers
for i in xrange(0, self._threads):
worker = self.Worker(self)
self._workers.append(worker)
worker.start()
def Stop(self):
"""Stops the thread pool. Joins all worker threads."""
assert self._workers
for i in xrange(0, len(self._workers)):
wrapped = lambda thread: thread.Done()
self._queue.put(wrapped)
self._queue.join()
for worker in self._workers:
worker.join()
try:
for worker in self._workers:
for e in worker.exceptions:
# If we collected exceptions, raise them now.
raise e
finally:
self._workers = []
def AddJob(self, function):
"""Adds a job to the queue.
A job is a simple closure, that will get executed on one of the worker
threads."""
wrapped = lambda worker: function()
self._queue.put(wrapped)
def WaitJobs(self):
"""Waits for all jobs to be completed."""
assert self._workers
self._queue.join()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment