Commit 80cbe8b7 authored by maruel@chromium.org's avatar maruel@chromium.org

Move ExecutionQueue and WorkItem to gclient_utils.py

Review URL: http://codereview.chromium.org/3107009

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@56020 0039d316-1c4b-4281-b951-d872f2087c98
parent ce464894
......@@ -59,7 +59,6 @@ import pprint
import re
import subprocess
import sys
import threading
import urlparse
import urllib
......@@ -80,110 +79,6 @@ def attr(attr, data):
## GClient implementation.
class WorkItem(object):
"""One work item."""
requirements = []
name = None
def run(self):
pass
class ExecutionQueue(object):
"""Dependencies sometime needs to be run out of order due to From() keyword.
This class manages that all the required dependencies are run before running
each one.
Methods of this class are multithread safe.
"""
def __init__(self, progress):
self.lock = threading.Lock()
# List of Dependency.
self.queued = []
# List of strings representing each Dependency.name that was run.
self.ran = []
# List of items currently running.
self.running = []
self.progress = progress
if self.progress:
self.progress.update()
def enqueue(self, d):
"""Enqueue one Dependency to be executed later once its requirements are
satisfied.
"""
assert isinstance(d, WorkItem)
try:
self.lock.acquire()
self.queued.append(d)
total = len(self.queued) + len(self.ran) + len(self.running)
finally:
self.lock.release()
if self.progress:
self.progress._total = total + 1
self.progress.update(0)
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
while self._run_one_item(*args, **kwargs):
pass
queued = []
running = []
try:
self.lock.acquire()
if self.queued:
queued = self.queued
self.queued = []
if self.running:
running = self.running
self.running = []
finally:
self.lock.release()
if self.progress:
self.progress.end()
if queued:
raise gclient_utils.Error('Entries still queued: %s' % str(queued))
if running:
raise gclient_utils.Error('Entries still queued: %s' % str(running))
def _run_one_item(self, *args, **kwargs):
"""Removes one item from the queue that has all its requirements completed
and execute it.
Returns False if no item could be run.
"""
i = 0
d = None
try:
self.lock.acquire()
while i != len(self.queued) and not d:
d = self.queued.pop(i)
for r in d.requirements:
if not r in self.ran:
self.queued.insert(i, d)
d = None
break
i += 1
if not d:
return False
self.running.append(d)
finally:
self.lock.release()
d.run(*args, **kwargs)
try:
self.lock.acquire()
# TODO(maruel): http://crbug.com/51711
#assert not d.name in self.ran
if not d.name in self.ran:
self.ran.append(d.name)
self.running.remove(d)
if self.progress:
self.progress.update(1)
finally:
self.lock.release()
return True
class GClientKeywords(object):
class FromImpl(object):
......@@ -239,7 +134,7 @@ class GClientKeywords(object):
raise gclient_utils.Error("Var is not defined: %s" % var_name)
class Dependency(GClientKeywords, WorkItem):
class Dependency(GClientKeywords, gclient_utils.WorkItem):
"""Object that represents a dependency checkout."""
DEPS_FILE = 'DEPS'
......@@ -815,7 +710,7 @@ solutions = [
pm = None
if command == 'update' and not self._options.verbose:
pm = Progress('Syncing projects', 1)
work_queue = ExecutionQueue(pm)
work_queue = gclient_utils.ExecutionQueue(pm)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, revision_overrides, command, args,
......@@ -860,7 +755,7 @@ solutions = [
if not self.dependencies:
raise gclient_utils.Error('No solution specified')
# Load all the settings.
work_queue = ExecutionQueue(None)
work_queue = gclient_utils.ExecutionQueue(None)
for s in self.dependencies:
work_queue.enqueue(s)
work_queue.flush(self._options, {}, None, [], work_queue)
......
......@@ -22,6 +22,7 @@ import stat
import subprocess
import sys
import time
import threading
import xml.dom.minidom
import xml.parsers.expat
......@@ -363,3 +364,109 @@ def GetGClientRootAndEntries(path=None):
execfile(config_path, env)
config_dir = os.path.dirname(config_path)
return config_dir, env['entries']
class WorkItem(object):
"""One work item."""
# A list of string, each being a WorkItem name.
requirements = []
# A unique string representing this work item.
name = None
def run(self):
pass
class ExecutionQueue(object):
"""Dependencies sometime needs to be run out of order due to From() keyword.
This class manages that all the required dependencies are run before running
each one.
Methods of this class are multithread safe.
"""
def __init__(self, progress):
self.lock = threading.Lock()
# List of WorkItem, Dependency inherits from WorkItem.
self.queued = []
# List of strings representing each Dependency.name that was run.
self.ran = []
# List of items currently running.
self.running = []
self.progress = progress
if self.progress:
self.progress.update()
def enqueue(self, d):
"""Enqueue one Dependency to be executed later once its requirements are
satisfied.
"""
assert isinstance(d, WorkItem)
try:
self.lock.acquire()
self.queued.append(d)
total = len(self.queued) + len(self.ran) + len(self.running)
finally:
self.lock.release()
if self.progress:
self.progress._total = total + 1
self.progress.update(0)
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
while self._run_one_item(*args, **kwargs):
pass
queued = []
running = []
try:
self.lock.acquire()
if self.queued:
queued = self.queued
self.queued = []
if self.running:
running = self.running
self.running = []
finally:
self.lock.release()
if self.progress:
self.progress.end()
if queued:
raise gclient_utils.Error('Entries still queued: %s' % str(queued))
if running:
raise gclient_utils.Error('Entries still queued: %s' % str(running))
def _run_one_item(self, *args, **kwargs):
"""Removes one item from the queue that has all its requirements completed
and execute it.
Returns False if no item could be run.
"""
i = 0
d = None
try:
self.lock.acquire()
while i != len(self.queued) and not d:
d = self.queued.pop(i)
for r in d.requirements:
if not r in self.ran:
self.queued.insert(i, d)
d = None
break
i += 1
if not d:
return False
self.running.append(d)
finally:
self.lock.release()
d.run(*args, **kwargs)
try:
self.lock.acquire()
assert not d.name in self.ran
if not d.name in self.ran:
self.ran.append(d.name)
self.running.remove(d)
if self.progress:
self.progress.update(1)
finally:
self.lock.release()
return True
......@@ -15,14 +15,15 @@ class GclientUtilsUnittest(SuperMoxTestBase):
"""General gclient_utils.py tests."""
def testMembersChanged(self):
members = [
'CheckCall', 'CheckCallError', 'Error', 'FileRead', 'FileWrite',
'FindFileUpwards', 'FindGclientRoot', 'GetGClientRootAndEntries',
'GetNamedNodeText', 'GetNodeNamedAttributeText',
'PathDifference', 'ParseXML', 'PrintableObject', 'RemoveDirectory',
'SplitUrlRevision', 'SubprocessCall', 'SubprocessCallAndFilter',
'SyntaxErrorToError',
'errno', 'logging', 'os', 're', 'stat', 'subprocess', 'sys', 'time',
'xml',
'CheckCall', 'CheckCallError', 'Error', 'ExecutionQueue', 'FileRead',
'FileWrite', 'FindFileUpwards', 'FindGclientRoot',
'GetGClientRootAndEntries', 'GetNamedNodeText',
'GetNodeNamedAttributeText', 'PathDifference', 'ParseXML',
'PrintableObject', 'RemoveDirectory', 'SplitUrlRevision',
'SubprocessCall', 'SubprocessCallAndFilter', 'SyntaxErrorToError',
'WorkItem',
'errno', 'logging', 'os', 're', 'stat', 'subprocess', 'sys',
'threading', 'time', 'xml',
]
# If this test fails, you should add the relevant test.
self.compareMembers(gclient_utils, members)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment