Commit 6dada4ee authored by dpranke@chromium.org's avatar dpranke@chromium.org

make tests work, implement 'set noparent', owners propagating down

Review URL: http://codereview.chromium.org/6627059

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@77351 0039d316-1c4b-4281-b951-d872f2087c98
parent 9ea49d29
......@@ -4,8 +4,15 @@
"""A database of OWNERS files."""
class Assertion(AssertionError):
pass
import re
# If this is present by itself on a line, this means that everyone can review.
EVERYONE = '*'
# Recognizes 'X@Y' email addresses. Very simplistic.
BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
class SyntaxErrorInOwnersFile(Exception):
......@@ -22,10 +29,6 @@ class SyntaxErrorInOwnersFile(Exception):
return "%s:%d syntax error" % (self.path, self.line)
# Wildcard email-address in the OWNERS file.
ANYONE = '*'
class Database(object):
"""A database of OWNERS files for a repository.
......@@ -36,72 +39,113 @@ class Database(object):
def __init__(self, root, fopen, os_path):
"""Args:
root: the path to the root of the Repository
all_owners: the list of every owner in the system
open: function callback to open a text file for reading
os_path: module/object callback with fields for 'exists',
'dirname', and 'join'
os_path: module/object callback with fields for 'abspath', 'dirname',
'exists', and 'join'
"""
self.root = root
self.fopen = fopen
self.os_path = os_path
# Mapping of files to authorized owners.
self.files_owned_by = {}
# TODO: Figure out how to share the owners email addr format w/
# tools/commit-queue/projects.py, especially for per-repo whitelists.
self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
# Mapping of owners to the files they own.
self.owners_for = {}
# Mapping of owners to the paths they own.
self.owned_by = {EVERYONE: set()}
# In-memory cached map of files to their OWNERS files.
self.owners_file_for = {}
# Mapping of paths to authorized owners.
self.owners_for = {}
# In-memory cache of OWNERS files and their contents
self.owners_files = {}
# Set of paths that stop us from looking above them for owners.
# (This is implicitly true for the root directory).
self.stop_looking = set([''])
def ReviewersFor(self, files):
"""Returns a suggested set of reviewers that will cover the set of files.
The set of files are paths relative to (and under) self.root."""
files is a set of paths relative to (and under) self.root."""
self._CheckPaths(files)
self._LoadDataNeededFor(files)
return self._CoveringSetOfOwnersFor(files)
def FilesAreCoveredBy(self, files, reviewers):
"""Returns whether every file is owned by at least one reviewer."""
return not self.FilesNotCoveredBy(files, reviewers)
def FilesNotCoveredBy(self, files, reviewers):
covered_files = set()
for reviewer in reviewers:
covered_files = covered_files.union(self.files_owned_by[reviewer])
return files.difference(covered_files)
"""Returns the set of files that are not owned by at least one reviewer."""
self._CheckPaths(files)
self._CheckReviewers(reviewers)
if not reviewers:
return files
def _LoadDataNeededFor(self, files):
self._LoadDataNeededFor(files)
files_by_dir = self._FilesByDir(files)
covered_dirs = self._DirsCoveredBy(reviewers)
uncovered_files = []
for d, files_in_d in files_by_dir.iteritems():
if not self._IsDirCoveredBy(d, covered_dirs):
uncovered_files.extend(files_in_d)
return set(uncovered_files)
def _CheckPaths(self, files):
def _isunder(f, pfx):
return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
assert all(_isunder(f, self.os_path.abspath(self.root)) for f in files)
def _CheckReviewers(self, reviewers):
"""Verifies each reviewer is a valid email address."""
assert all(self.email_regexp.match(r) for r in reviewers)
def _FilesByDir(self, files):
dirs = {}
for f in files:
self._LoadOwnersFor(f)
def _LoadOwnersFor(self, f):
if f not in self.owners_for:
owner_file = self._FindOwnersFileFor(f)
self.owners_file_for[f] = owner_file
self._ReadOwnersFile(owner_file, f)
def _FindOwnersFileFor(self, f):
# This is really a "do ... until dirname = ''"
dirname = self.os_path.dirname(f)
while dirname:
owner_path = self.os_path.join(dirname, 'OWNERS')
if self.os_path.exists(owner_path):
return owner_path
dirs.setdefault(self.os_path.dirname(f), []).append(f)
return dirs
def _DirsCoveredBy(self, reviewers):
dirs = self.owned_by[EVERYONE]
for r in reviewers:
dirs = dirs | self.owned_by.get(r, set())
return dirs
def _StopLooking(self, dirname):
return dirname in self.stop_looking
def _IsDirCoveredBy(self, dirname, covered_dirs):
while not dirname in covered_dirs and not self._StopLooking(dirname):
dirname = self.os_path.dirname(dirname)
owner_path = self.os_path.join(dirname, 'OWNERS')
if self.os_path.exists(owner_path):
return owner_path
raise Assertion('No OWNERS file found for %s' % f)
def _ReadOwnersFile(self, owner_file, affected_file):
owners_for = self.owners_for.setdefault(affected_file, set())
for owner in self.fopen(owner_file):
owner = owner.strip()
self.files_owned_by.setdefault(owner, set()).add(affected_file)
owners_for.add(owner)
return dirname in covered_dirs
def _LoadDataNeededFor(self, files):
for f in files:
dirpath = self.os_path.dirname(f)
while not dirpath in self.owners_for:
self._ReadOwnersInDir(dirpath)
if self._StopLooking(dirpath):
break
dirpath = self.os_path.dirname(dirpath)
def _ReadOwnersInDir(self, dirpath):
owners_path = self.os_path.join(self.root, dirpath, 'OWNERS')
if not self.os_path.exists(owners_path):
return
lineno = 0
for line in self.fopen(owners_path):
lineno += 1
line = line.strip()
if line.startswith('#'):
continue
if line == 'set noparent':
self.stop_looking.add(dirpath)
continue
if self.email_regexp.match(line) or line == EVERYONE:
self.owned_by.setdefault(line, set()).add(dirpath)
self.owners_for.setdefault(dirpath, set()).add(line)
continue
raise SyntaxErrorInOwnersFile(owners_path, lineno, line)
def _CoveringSetOfOwnersFor(self, files):
# TODO(dpranke): implement the greedy algorithm for covering sets, and
......@@ -109,5 +153,10 @@ class Database(object):
# short combinations of owners.
every_owner = set()
for f in files:
every_owner = every_owner.union(self.owners_for[f])
dirname = self.os_path.dirname(f)
while dirname in self.owners_for:
every_owner |= self.owners_for[dirname]
if self._StopLooking(dirname):
break
dirname = self.os_path.dirname(dirname)
return every_owner
......@@ -32,10 +32,15 @@ class MockFileSystem(object):
def _split(self, path):
return path.rsplit(self.sep, 1)
def abspath(self, path):
if path.endswith(self.sep):
return path[:-1]
return path
def dirname(self, path):
if not self.sep in path:
if self.sep not in path:
return ''
return self._split(path)[0]
return self._split(path)[0] or self.sep
def exists(self, path):
return self.isfile(path) or self.isdir(path)
......@@ -56,7 +61,7 @@ class MockFileSystem(object):
return any(f.startswith(path) for f in files)
def join(self, *comps):
# FIXME: might want tests for this and/or a better comment about how
# TODO: Might want tests for this and/or a better comment about how
# it works.
return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
......
......@@ -19,15 +19,17 @@ peter = 'peter@example.com'
def owners_file(*email_addresses, **kwargs):
s = ''
if kwargs.get('comment'):
s += '# %s\n' % kwargs.get('comment')
if kwargs.get('noparent'):
s = 'set noparent\n'
s += 'set noparent\n'
return s + '\n'.join(email_addresses) + '\n'
def test_repo():
return filesystem_mock.MockFileSystem(files={
'/DEPS' : '',
'/OWNERS': owners_file('*'),
'/OWNERS': owners_file(owners.EVERYONE),
'/base/vlog.h': '',
'/chrome/OWNERS': owners_file(ben, brett),
'/chrome/gpu/OWNERS': owners_file(ken),
......@@ -35,7 +37,7 @@ def test_repo():
'/chrome/renderer/OWNERS': owners_file(peter),
'/chrome/renderer/gpu/gpu_channel_host.h': '',
'/chrome/renderer/safe_browsing/scorer.h': '',
'/content/OWNERS': owners_file(john, darin, noparent=True),
'/content/OWNERS': owners_file(john, darin, comment='foo', noparent=True),
'/content/content.gyp': '',
})
......@@ -53,70 +55,76 @@ class OwnersDatabaseTest(unittest.TestCase):
os_path = os_path or self.repo
return owners.Database(root, fopen, os_path)
def assertReviewersFor(self, files, expected_reviewers):
db = self.db()
self.assertEquals(db.ReviewersFor(set(files)), set(expected_reviewers))
def test_Constructor(self):
self.assertNotEquals(self.db(), None)
def assertCoveredBy(self, files, reviewers):
def assert_CoveredBy(self, files, reviewers):
db = self.db()
self.assertTrue(db.FilesAreCoveredBy(set(files), set(reviewers)))
def assertNotCoveredBy(self, files, reviewers, unreviewed_files):
def test_CoveredBy_Everyone(self):
self.assert_CoveredBy(['DEPS'], [john])
self.assert_CoveredBy(['DEPS'], [darin])
def test_CoveredBy_Explicit(self):
self.assert_CoveredBy(['content/content.gyp'], [john])
self.assert_CoveredBy(['chrome/gpu/OWNERS'], [ken])
def test_CoveredBy_OwnersPropagatesDown(self):
self.assert_CoveredBy(['chrome/gpu/OWNERS'], [ben])
self.assert_CoveredBy(['/chrome/renderer/gpu/gpu_channel_host.h'], [peter])
def assert_NotCoveredBy(self, files, reviewers, unreviewed_files):
db = self.db()
self.assertEquals(db.FilesNotCoveredBy(set(files), set(reviewers)),
set(unreviewed_files))
def test_constructor(self):
self.assertNotEquals(self.db(), None)
def test_NotCoveredBy_NeedAtLeastOneReviewer(self):
self.assert_NotCoveredBy(['DEPS'], [], ['DEPS'])
def test_owners_for(self):
self.assertReviewersFor(['DEPS'], [owners.ANYONE])
self.assertReviewersFor(['content/content.gyp'], [john, darin])
self.assertReviewersFor(['chrome/gpu/gpu_channel.h'], [ken])
def test_covered_by(self):
self.assertCoveredBy(['DEPS'], [john])
self.assertCoveredBy(['DEPS'], [darin])
self.assertCoveredBy(['content/content.gyp'], [john])
self.assertCoveredBy(['chrome/gpu/OWNERS'], [ken])
self.assertCoveredBy(['chrome/gpu/OWNERS'], [ben])
def test_not_covered_by(self):
self.assertNotCoveredBy(['DEPS'], [], ['DEPS'])
self.assertNotCoveredBy(['content/content.gyp'], [ben],
['content/content.gyp'])
self.assertNotCoveredBy(
['chrome/gpu/gpu_channel.h', 'chrome/renderer/gpu/gpu_channel_host.h'],
[peter], ['chrome/gpu/gpu_channel.h'])
self.assertNotCoveredBy(
def test_NotCoveredBy_OwnersPropagatesDown(self):
self.assert_NotCoveredBy(
['chrome/gpu/gpu_channel.h', 'chrome/renderer/gpu/gpu_channel_host.h'],
[ben], [])
def test_comments_in_owners_file(self):
# pylint: disable=W0212
def test_NotCoveredBy_PartialCovering(self):
self.assert_NotCoveredBy(
['content/content.gyp', 'chrome/renderer/gpu/gpu_channel_host.h'],
[peter], ['content/content.gyp'])
def test_NotCoveredBy_SetNoParentWorks(self):
self.assert_NotCoveredBy(['content/content.gyp'], [ben],
['content/content.gyp'])
def assert_ReviewersFor(self, files, expected_reviewers):
db = self.db()
# Tests that this doesn't raise an error.
db._ReadOwnersFile('OWNERS', 'DEPS')
self.assertEquals(db.ReviewersFor(set(files)), set(expected_reviewers))
def test_ReviewersFor_BasicFunctionality(self):
self.assert_ReviewersFor(['chrome/gpu/gpu_channel.h'],
[ken, ben, brett, owners.EVERYONE])
def test_ReviewersFor_SetNoParentWorks(self):
self.assert_ReviewersFor(['content/content.gyp'], [john, darin])
def test_syntax_error_in_owners_file(self):
# pylint: disable=W0212
def test_ReviewersFor_WildcardDir(self):
self.assert_ReviewersFor(['DEPS'], [owners.EVERYONE])
def assert_SyntaxError(self, owners_file_contents):
db = self.db()
self.files['/foo/OWNERS'] = '{}\n'
self.files['/foo/DEPS'] = '# DEPS\n'
self.assertRaises(owners.SyntaxErrorInOwnersFile, db._ReadOwnersFile,
'/foo/OWNERS', '/foo/DEPS')
self.files['/bar/OWNERS'] = 'set myparentislinus\n'
self.files['/bar/DEPS'] = '# DEPS\n'
self.assertRaises(owners.SyntaxErrorInOwnersFile, db._ReadOwnersFile,
'/bar/OWNERS', '/bar/DEPS')
def test_owners_propagates_down(self):
self.assertCoveredBy(['/chrome/renderer/gpu/gpu_channel_host.h'], [peter])
def test_set_noparent(self):
self.assertNotCoveredBy(['/content/content.gyp'], [peter],
['/content/content.gyp'])
self.files['/foo/OWNERS'] = owners_file_contents
self.files['/foo/DEPS'] = ''
self.assertRaises(owners.SyntaxErrorInOwnersFile, db.ReviewersFor,
['/foo/DEPS'])
def test_SyntaxError_UnknownToken(self):
self.assert_SyntaxError('{}\n')
def test_SyntaxError_UnknownSet(self):
self.assert_SyntaxError('set myfatherisbillgates\n')
def test_SyntaxError_BadEmail(self):
self.assert_SyntaxError('ben\n')
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment