Commit a9677a5e authored by Gavin Mak's avatar Gavin Mak Committed by LUCI CQ

Revert "Remove old DepotToolsOwners implementation"

This reverts commit 10dbd7ba.

Reason for revert: removal broke builders

Original change's description:
> Remove old DepotToolsOwners implementation
>
> code-owners should have been enabled for most hosts that depot_tools
> supports by now. Remove our own implementation and rely on code-owners.
>
> Change-Id: Iaf0d3db65b2e5063b67d42b92188c4ec51d2cd9a
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/3783475
> Reviewed-by: Aravind Vasudevan <aravindvasudev@google.com>
> Reviewed-by: Joanna Wang <jojwang@chromium.org>
> Commit-Queue: Gavin Mak <gavinmak@google.com>

Change-Id: I781648f470e8f2c9d408ae7112cfc41060f5385d
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/3818251
Auto-Submit: Gavin Mak <gavinmak@google.com>
Reviewed-by: 's avatarAravind Vasudevan <aravindvasudev@google.com>
Commit-Queue: Aravind Vasudevan <aravindvasudev@google.com>
parent 10dbd7ba
......@@ -1023,6 +1023,8 @@ class Changelist(object):
remote, remote_branch = self.GetRemoteBranch()
branch = GetTargetRef(remote, remote_branch, None)
self._owners_client = owners_client.GetCodeOwnersClient(
root=settings.GetRoot(),
upstream=self.GetCommonAncestorWithUpstream(),
host=self.GetGerritHost(),
project=self.GetGerritProject(),
branch=branch)
......
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
r"""A database of OWNERS files.
OWNERS files indicate who is allowed to approve changes in a specific directory
(or who is allowed to make changes without needing approval of another OWNER).
Note that all changes must still be reviewed by someone familiar with the code,
so you may need approval from both an OWNER and a reviewer in many cases.
The syntax of the OWNERS file is, roughly:
lines := (\s* line? \s* comment? \s* "\n")*
line := directive
| "per-file" \s+ glob \s* "=" \s* directive
directive := "set noparent"
| "file:" owner_file
| email_address
| "*"
glob := [a-zA-Z0-9_-*?]+
comment := "#" [^"\n"]*
owner_file := "OWNERS"
| [^"\n"]* "_OWNERS"
Email addresses must follow the foo@bar.com short form (exact syntax given
in BASIC_EMAIL_REGEXP, below). Filename globs follow the simple unix
shell conventions, and relative and absolute paths are not allowed (i.e.,
globs only refer to the files in the current directory).
If a user's email is one of the email_addresses in the file, the user is
considered an "OWNER" for all files in the directory.
If the "per-file" directive is used, the line only applies to files in that
directory that match the filename glob specified.
If the "set noparent" directive used, then only entries in this OWNERS file
apply to files in this directory; if the "set noparent" directive is not
used, then entries in OWNERS files in enclosing (upper) directories also
apply (up until a "set noparent is encountered").
If "per-file glob=set noparent" is used, then global directives are ignored
for the glob, and only the "per-file" owners are used for files matching that
glob.
If the "file:" directive is used, the referred to OWNERS file will be parsed and
considered when determining the valid set of OWNERS. If the filename starts with
"//" it is relative to the root of the repository, otherwise it is relative to
the current file. The referred to file *must* be named OWNERS or end in a suffix
of _OWNERS.
Examples for all of these combinations can be found in tests/owners_unittest.py.
"""
import collections
import fnmatch
import os
import random
import re
try:
# This fallback applies for all versions of Python before 3.3
import collections.abc as collections_abc
except ImportError:
import collections as collections_abc
# If this is present by itself on a line, this means that everyone can review.
EVERYONE = '*'
# Recognizes 'X@Y' email addresses. Very simplistic.
BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
# Key for global comments per email address. Should be unlikely to be a
# pathname.
GLOBAL_STATUS = '*'
# Returned if there is no owner and anyone +1
ANYONE='<anyone>'
def _assert_is_collection(obj):
assert not isinstance(obj, str)
# Module 'collections' has no 'Iterable' member
# pylint: disable=no-member
if hasattr(collections_abc, 'Iterable') and hasattr(collections_abc, 'Sized'):
assert (isinstance(obj, collections_abc.Iterable) and
isinstance(obj, collections_abc.Sized))
class SyntaxErrorInOwnersFile(Exception):
def __init__(self, path, lineno, msg):
super(SyntaxErrorInOwnersFile, self).__init__((path, lineno, msg))
self.path = path
self.lineno = lineno
self.msg = msg
def __str__(self):
return '%s:%d syntax error: %s' % (self.path, self.lineno, self.msg)
class Database(object):
"""A database of OWNERS files for a repository.
This class allows you to find a suggested set of reviewers for a list
of changed files, and see if a list of changed files is covered by a
list of reviewers."""
def __init__(self, root, fopen, os_path):
"""Args:
root: the path to the root of the Repository
open: function callback to open a text file for reading
os_path: module/object callback with fields for 'abspath', 'dirname',
'exists', 'join', and 'relpath'
"""
self.root = root
self.fopen = fopen
self.os_path = os_path
# Pick a default email regexp to use; callers can override as desired.
self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
# Replacement contents for the given files. Maps the file name of an
# OWNERS file (relative to root) to an iterator returning the replacement
# file contents.
self.override_files = {}
# Mapping of owners to the paths or globs they own.
self._owners_to_paths = {EVERYONE: set()}
# Mappings of directories -> globs in the directory -> owners
# Example: "chrome/browser" -> "chrome/browser/*.h" -> ("john", "maria")
# Paths used as keys must use slashes as the separator, even on Windows.
self._paths_to_owners = {}
# Mapping reviewers to the preceding comment per file in the OWNERS files.
self.comments = {}
# Cache of compiled regexes for _fnmatch()
self._fnmatch_cache = {}
# Sets of paths that stop us from looking above them for owners.
# (This is implicitly true for the root directory).
#
# The implementation is a mapping:
# Directory -> globs in the directory,
#
# Example:
# 'ui/events/devices/mojo' -> 'ui/events/devices/mojo/*_struct_traits*.*'
self._stop_looking = {'': set([''])}
# Set of files which have already been read.
self.read_files = set()
# Set of files which were included from other files. Files are processed
# differently depending on whether they are regular owners files or
# being included from another file.
self._included_files = {}
# File with global status lines for owners.
self._status_file = None
def _file_affects_ownership(self, path):
"""Returns true if the path refers to a file that could affect ownership."""
filename = self.os_path.split(path)[-1]
return filename == 'OWNERS' or filename.endswith('_OWNERS')
def reviewers_for(self, files, author):
"""Returns a suggested set of reviewers that will cover the files.
files is a sequence of paths relative to (and under) self.root.
If author is nonempty, we ensure it is not included in the set returned
in order avoid suggesting the author as a reviewer for their own changes."""
self._check_paths(files)
self.load_data_needed_for(files)
suggested_owners = self._covering_set_of_owners_for(files, author)
if EVERYONE in suggested_owners:
if len(suggested_owners) > 1:
suggested_owners.remove(EVERYONE)
else:
suggested_owners = set([ANYONE])
return suggested_owners
def files_not_covered_by(self, files, reviewers):
"""Returns the files not owned by one of the reviewers.
Args:
files is a sequence of paths relative to (and under) self.root.
reviewers is a sequence of strings matching self.email_regexp.
"""
self._check_paths(files)
self._check_reviewers(reviewers)
self.load_data_needed_for(files)
return set(f for f in files if not self.is_covered_by(f, reviewers))
def _check_paths(self, files):
def _is_under(f, pfx):
return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
_assert_is_collection(files)
assert all(not self.os_path.isabs(f) and
_is_under(f, self.os_path.abspath(self.root)) for f in files)
def _check_reviewers(self, reviewers):
_assert_is_collection(reviewers)
assert all(self.email_regexp.match(r) for r in reviewers), reviewers
def is_covered_by(self, objname, reviewers):
reviewers = list(reviewers) + [EVERYONE]
while True:
for reviewer in reviewers:
for owned_pattern in self._owners_to_paths.get(reviewer, set()):
if fnmatch.fnmatch(objname, owned_pattern):
return True
if self._should_stop_looking(objname):
break
objname = self.os_path.dirname(objname)
return False
def enclosing_dir_with_owners(self, objname):
"""Returns the innermost enclosing directory that has an OWNERS file."""
dirpath = objname
while not self._owners_for(dirpath):
if self._should_stop_looking(dirpath):
break
dirpath = self.os_path.dirname(dirpath)
return dirpath
def load_data_needed_for(self, files):
self._read_global_comments()
visited_dirs = set()
for f in files:
# Always use slashes as separators.
f = f.replace(os.sep, '/')
dirpath = self.os_path.dirname(f)
while dirpath not in visited_dirs:
visited_dirs.add(dirpath)
obj_owners = self._owners_for(dirpath)
if obj_owners:
break
self._read_owners(self.os_path.join(dirpath, 'OWNERS'))
if self._should_stop_looking(dirpath):
break
dirpath = self.os_path.dirname(dirpath)
def _should_stop_looking(self, objname):
dirname = objname
while True:
if dirname in self._stop_looking:
if any(self._fnmatch(objname, stop_looking)
for stop_looking in self._stop_looking[dirname]):
return True
up_dirname = self.os_path.dirname(dirname)
if up_dirname == dirname:
break
dirname = up_dirname
return False
def _get_root_affected_dir(self, obj_name):
"""Returns the deepest directory/path that is affected by a file pattern
|obj_name|."""
root_affected_dir = obj_name
while '*' in root_affected_dir:
root_affected_dir = self.os_path.dirname(root_affected_dir)
return root_affected_dir
def _owners_for(self, objname):
obj_owners = set()
# Possibly relevant rules can be found stored at every directory
# level so iterate upwards, looking for them.
dirname = objname
while True:
dir_owner_rules = self._paths_to_owners.get(dirname)
if dir_owner_rules:
for owned_path, path_owners in dir_owner_rules.items():
if self._fnmatch(objname, owned_path):
obj_owners |= path_owners
up_dirname = self.os_path.dirname(dirname)
if up_dirname == dirname:
break
dirname = up_dirname
return obj_owners
def _read_owners(self, path):
owners_path = self.os_path.join(self.root, path)
if not (self.os_path.exists(owners_path) or (path in self.override_files)):
return
if owners_path in self.read_files:
return
self.read_files.add(owners_path)
is_toplevel = path == 'OWNERS'
comment = []
dirpath = self.os_path.dirname(path)
in_comment = False
# We treat the beginning of the file as an blank line.
previous_line_was_blank = True
reset_comment_after_use = False
lineno = 0
if path in self.override_files:
file_iter = self.override_files[path]
else:
file_iter = self.fopen(owners_path)
for line in file_iter:
lineno += 1
line = line.strip()
if line.startswith('#'):
if is_toplevel:
m = re.match(r'#\s*OWNERS_STATUS\s+=\s+(.+)$', line)
if m:
self._status_file = m.group(1).strip()
continue
if not in_comment:
comment = []
reset_comment_after_use = not previous_line_was_blank
comment.append(line[1:].strip())
in_comment = True
continue
in_comment = False
if line == '':
comment = []
previous_line_was_blank = True
continue
# If the line ends with a comment, strip the comment and store it for this
# line only.
line, _, line_comment = line.partition('#')
line = line.strip()
line_comment = [line_comment.strip()] if line_comment else []
previous_line_was_blank = False
if line == 'set noparent':
self._stop_looking.setdefault(
self._get_root_affected_dir(dirpath), set()).add(dirpath)
continue
m = re.match('per-file (.+)=(.+)', line)
if m:
glob_string = m.group(1).strip()
directive = m.group(2).strip()
full_glob_string = self.os_path.join(self.root, dirpath, glob_string)
if '/' in glob_string or '\\' in glob_string:
raise SyntaxErrorInOwnersFile(owners_path, lineno,
'per-file globs cannot span directories or use escapes: "%s"' %
line)
relative_glob_string = self.os_path.relpath(full_glob_string, self.root)
self._add_entry(relative_glob_string, directive, owners_path,
lineno, '\n'.join(comment + line_comment))
if reset_comment_after_use:
comment = []
continue
if line.startswith('set '):
raise SyntaxErrorInOwnersFile(owners_path, lineno,
'unknown option: "%s"' % line[4:].strip())
self._add_entry(dirpath, line, owners_path, lineno,
' '.join(comment + line_comment))
if reset_comment_after_use:
comment = []
def _read_global_comments(self):
if not self._status_file:
if not 'OWNERS' in self.read_files:
self._read_owners('OWNERS')
if not self._status_file:
return
owners_status_path = self.os_path.join(self.root, self._status_file)
if not self.os_path.exists(owners_status_path):
raise IOError('Could not find global status file "%s"' %
owners_status_path)
if owners_status_path in self.read_files:
return
self.read_files.add(owners_status_path)
lineno = 0
for line in self.fopen(owners_status_path):
lineno += 1
line = line.strip()
if line.startswith('#'):
continue
if line == '':
continue
m = re.match('(.+?):(.+)', line)
if m:
owner = m.group(1).strip()
comment = m.group(2).strip()
if not self.email_regexp.match(owner):
raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
'invalid email address: "%s"' % owner)
self.comments.setdefault(owner, {})
self.comments[owner][GLOBAL_STATUS] = comment
continue
raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
'cannot parse status entry: "%s"' % line.strip())
def _add_entry(self, owned_paths, directive, owners_path, lineno, comment):
# Consistently uses paths with slashes as the keys or else Windows will
# break in surprising and untested ways.
owned_paths = owned_paths.replace(os.sep, '/')
if directive == 'set noparent':
self._stop_looking.setdefault(
self._get_root_affected_dir(owned_paths), set()).add(owned_paths)
elif directive.startswith('file:'):
include_file = self._resolve_include(directive[5:], owners_path, lineno)
if not include_file:
raise SyntaxErrorInOwnersFile(owners_path, lineno,
('%s does not refer to an existing file.' % directive[5:]))
included_owners = self._read_just_the_owners(include_file)
for owner in included_owners:
self._owners_to_paths.setdefault(owner, set()).add(owned_paths)
self._paths_to_owners.setdefault(
self._get_root_affected_dir(owned_paths), {}).setdefault(
owned_paths, set()).add(owner)
elif self.email_regexp.match(directive) or directive == EVERYONE:
if comment:
self.comments.setdefault(directive, {})
self.comments[directive][owned_paths] = comment
self._owners_to_paths.setdefault(directive, set()).add(owned_paths)
self._paths_to_owners.setdefault(
self._get_root_affected_dir(owned_paths), {}).setdefault(
owned_paths, set()).add(directive)
else:
raise SyntaxErrorInOwnersFile(owners_path, lineno,
('"%s" is not a "set noparent", file include, "*", '
'or an email address.' % (directive,)))
def _resolve_include(self, path, start, lineno):
if path.startswith('//'):
include_path = path[2:]
else:
assert start.startswith(self.root)
start = self.os_path.dirname(self.os_path.relpath(start, self.root))
include_path = self.os_path.normpath(self.os_path.join(start, path))
if include_path in self.override_files:
return include_path
owners_path = self.os_path.join(self.root, include_path)
# Paths included via "file:" must end in OWNERS or _OWNERS. Files that can
# affect ownership have a different set of ownership rules, so that users
# cannot self-approve changes adding themselves to an OWNERS file.
if not self._file_affects_ownership(owners_path):
raise SyntaxErrorInOwnersFile(start, lineno, 'file: include must specify '
'a file named OWNERS or ending in _OWNERS')
if not self.os_path.exists(owners_path):
return None
return include_path
def _read_just_the_owners(self, include_file):
if include_file in self._included_files:
return self._included_files[include_file]
owners = set()
self._included_files[include_file] = owners
lineno = 0
if include_file in self.override_files:
file_iter = self.override_files[include_file]
else:
file_iter = self.fopen(self.os_path.join(self.root, include_file))
for line in file_iter:
lineno += 1
line = line.strip()
if (line.startswith('#') or line == '' or
line.startswith('set noparent') or
line.startswith('per-file')):
continue
# If the line ends with a comment, strip the comment.
line, _delim, _comment = line.partition('#')
line = line.strip()
if self.email_regexp.match(line) or line == EVERYONE:
owners.add(line)
continue
if line.startswith('file:'):
sub_include_file = self._resolve_include(line[5:], include_file, lineno)
sub_owners = self._read_just_the_owners(sub_include_file)
owners.update(sub_owners)
continue
raise SyntaxErrorInOwnersFile(include_file, lineno,
('"%s" is not a "set noparent", file include, "*", '
'or an email address.' % (line,)))
return owners
def _covering_set_of_owners_for(self, files, author):
dirs_remaining = set()
for f in files:
dir_path = self.enclosing_dir_with_owners(f)
# Always use slashes as separators.
dirs_remaining.add(dir_path.replace(os.sep, '/'))
all_possible_owners = self.all_possible_owners(dirs_remaining, author)
suggested_owners = set()
while dirs_remaining and all_possible_owners:
owner = self.lowest_cost_owner(all_possible_owners, dirs_remaining)
suggested_owners.add(owner)
dirs_to_remove = set(el[0] for el in all_possible_owners[owner])
dirs_remaining -= dirs_to_remove
# Now that we've used `owner` and covered all their dirs, remove them
# from consideration.
del all_possible_owners[owner]
for o, dirs in list(all_possible_owners.items()):
new_dirs = [(d, dist) for (d, dist) in dirs if d not in dirs_to_remove]
if not new_dirs:
del all_possible_owners[o]
else:
all_possible_owners[o] = new_dirs
return suggested_owners
def _all_possible_owners_for_dir_or_file(self, dir_or_file, author,
cache):
"""Returns a dict of {potential owner: (dir_or_file, distance)} mappings.
"""
assert not dir_or_file.startswith("/")
res = cache.get(dir_or_file)
if res is None:
res = {}
dirname = dir_or_file
for owner in self._owners_for(dirname):
if author and owner == author:
continue
res.setdefault(owner, [])
res[owner] = (dir_or_file, 1)
if not self._should_stop_looking(dirname):
dirname = self.os_path.dirname(dirname)
parent_res = self._all_possible_owners_for_dir_or_file(dirname,
author, cache)
# Merge the parent information with our information, adjusting
# distances as necessary, and replacing the parent directory
# names with our names.
for owner, par_dir_and_distances in parent_res.items():
if owner in res:
# If the same person is in multiple OWNERS files above a given
# directory, only count the closest one.
continue
parent_distance = par_dir_and_distances[1]
res[owner] = (dir_or_file, parent_distance + 1)
cache[dir_or_file] = res
return res
def all_possible_owners(self, dirs_and_files, author):
"""Returns a dict of {potential owner: (dir, distance)} mappings.
A distance of 1 is the lowest/closest possible distance (which makes the
subsequent math easier).
"""
self.load_data_needed_for(dirs_and_files)
all_possible_owners_for_dir_or_file_cache = {}
all_possible_owners = {}
for current_dir in dirs_and_files:
# Always use slashes as separators.
current_dir = current_dir.replace(os.sep, '/')
dir_owners = self._all_possible_owners_for_dir_or_file(
current_dir, author,
all_possible_owners_for_dir_or_file_cache)
for owner, dir_and_distance in dir_owners.items():
if owner in all_possible_owners:
all_possible_owners[owner].append(dir_and_distance)
else:
all_possible_owners[owner] = [dir_and_distance]
return all_possible_owners
def _fnmatch(self, filename, pattern):
"""Same as fnmatch.fnmatch(), but internally caches the compiled regexes."""
# Make sure backslashes are never used in the filename. The regex
# expressions generated by fnmatch.translate don't handle matching slashes
# to backslashes.
filename = filename.replace(os.sep, '/')
assert pattern.count('\\') == 0, 'Backslashes found in %s' % pattern
matcher = self._fnmatch_cache.get(pattern)
if matcher is None:
matcher = re.compile(fnmatch.translate(pattern)).match
self._fnmatch_cache[pattern] = matcher
return matcher(filename)
@staticmethod
def total_costs_by_owner(all_possible_owners, dirs):
# We want to minimize both the number of reviewers and the distance
# from the files/dirs needing reviews. The "pow(X, 1.75)" below is
# an arbitrarily-selected scaling factor that seems to work well - it
# will select one reviewer in the parent directory over three reviewers
# in subdirs, but not one reviewer over just two.
result = {}
for owner in all_possible_owners:
total_distance = 0
num_directories_owned = 0
for dirname, distance in all_possible_owners[owner]:
if dirname in dirs:
total_distance += distance
num_directories_owned += 1
if num_directories_owned:
result[owner] = (total_distance /
pow(num_directories_owned, 1.75))
return result
@staticmethod
def lowest_cost_owner(all_possible_owners, dirs):
total_costs_by_owner = Database.total_costs_by_owner(all_possible_owners,
dirs)
# Return the lowest cost owner. In the case of a tie, pick one randomly.
lowest_cost = min(total_costs_by_owner.values())
lowest_cost_owners = [
owner for owner, cost in total_costs_by_owner.items()
if cost == lowest_cost]
return random.Random().choice(lowest_cost_owners)
def owners_rooted_at_file(self, filename):
"""Returns a set of all owners transitively listed in filename.
This function returns a set of all the owners either listed in filename, or
in a file transitively included by filename. Lines that are not plain owners
(i.e. per-file owners) are ignored.
"""
return self._read_just_the_owners(filename)
......@@ -2,11 +2,15 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import itertools
import os
import random
import threading
import gerrit_util
import git_common
import owners as owners_db
import scm
class OwnersClient(object):
......@@ -113,6 +117,47 @@ class OwnersClient(object):
return selected
class DepotToolsClient(OwnersClient):
"""Implement OwnersClient using owners.py Database."""
def __init__(self, root, branch, fopen=open, os_path=os.path):
super(DepotToolsClient, self).__init__()
self._root = root
self._branch = branch
self._fopen = fopen
self._os_path = os_path
self._db = None
self._db_lock = threading.Lock()
def _ensure_db(self):
if self._db is not None:
return
self._db = owners_db.Database(self._root, self._fopen, self._os_path)
self._db.override_files = self._GetOriginalOwnersFiles()
def _GetOriginalOwnersFiles(self):
return {
f: scm.GIT.GetOldContents(self._root, f, self._branch).splitlines()
for _, f in scm.GIT.CaptureStatus(self._root, self._branch)
if os.path.basename(f) == 'OWNERS'
}
def ListOwners(self, path):
# all_possible_owners is not thread safe.
with self._db_lock:
self._ensure_db()
# all_possible_owners returns a dict {owner: [(path, distance)]}. We want
# to return a list of owners sorted by increasing distance.
distance_by_owner = self._db.all_possible_owners([path], None)
# We add a small random number to the distance, so that owners at the
# same distance are returned in random order to avoid overloading those
# who would appear first.
return sorted(
distance_by_owner,
key=lambda o: distance_by_owner[o][0][1] + random.random())
class GerritClient(OwnersClient):
"""Implement OwnersClient using OWNERS REST API."""
def __init__(self, host, project, branch):
......@@ -151,14 +196,11 @@ class GerritClient(OwnersClient):
return self._owners_cache[path]
def GetCodeOwnersClient(host, project, branch):
def GetCodeOwnersClient(root, upstream, host, project, branch):
"""Get a new OwnersClient.
Uses GerritClient and raises an exception if code-owners plugin is not
available."""
Defaults to GerritClient, and falls back to DepotToolsClient if code-owners
plugin is not available."""
if gerrit_util.IsCodeOwnersEnabledOnHost(host):
return GerritClient(host, project, branch)
raise Exception(
'code-owners plugin is not enabled. Ask your host admin to enable it '
'on %s. Read more about code-owners at '
'https://gerrit.googlesource.com/plugins/code-owners.' % host)
return DepotToolsClient(root, upstream)
......@@ -44,6 +44,7 @@ import gclient_paths # Exposed through the API
import gclient_utils
import git_footers
import gerrit_util
import owners as owners_db
import owners_client
import owners_finder
import presubmit_canned_checks
......@@ -679,11 +680,15 @@ class InputApi(object):
if self.gerrit and not 'PRESUBMIT_SKIP_NETWORK' in self.environ:
try:
self.owners_client = owners_client.GetCodeOwnersClient(
root=change.RepositoryRoot(),
upstream=change.UpstreamBranch(),
host=self.gerrit.host,
project=self.gerrit.project,
branch=self.gerrit.branch)
except Exception as e:
print('Failed to set owners_client - %s' % str(e))
self.owners_db = owners_db.Database(
change.RepositoryRoot(), fopen=open, os_path=self.os_path)
self.owners_finder = owners_finder.OwnersFinder
self.verbose = verbose
self.Command = CommandData
......
......@@ -26,6 +26,41 @@ dave = 'dave@example.com'
emily = 'emily@example.com'
class DepotToolsClientTest(unittest.TestCase):
def setUp(self):
self.repo = filesystem_mock.MockFileSystem(files={
'/OWNERS': '\n'.join([
'per-file approved.cc=approver@example.com',
'per-file reviewed.h=reviewer@example.com',
'missing@example.com',
]),
'/approved.cc': '',
'/reviewed.h': '',
'/bar/insufficient_reviewers.py': '',
'/bar/everyone/OWNERS': '*',
'/bar/everyone/foo.txt': '',
})
self.root = '/'
self.fopen = self.repo.open_for_reading
self.addCleanup(mock.patch.stopall)
self.client = owners_client.DepotToolsClient(
'/', 'branch', self.fopen, self.repo)
@mock.patch('scm.GIT.CaptureStatus')
@mock.patch('scm.GIT.GetOldContents')
def testListOwners(self, mockGetOldContents, mockCaptureStatus):
mockGetOldContents.side_effect = lambda r, f, _b: self.repo.files[r + f]
mockCaptureStatus.return_value = [
('M', 'bar/everyone/foo.txt'),
('M', 'OWNERS'),
]
self.assertEqual(
['*', 'missing@example.com'],
self.client.ListOwners('bar/everyone/foo.txt'))
mockCaptureStatus.assert_called_once_with('/', 'branch')
class GerritClientTest(unittest.TestCase):
def setUp(self):
self.client = owners_client.GerritClient('host', 'project', 'branch')
......@@ -260,16 +295,17 @@ class GetCodeOwnersClientTest(unittest.TestCase):
mock.patch('gerrit_util.IsCodeOwnersEnabledOnHost').start()
self.addCleanup(mock.patch.stopall)
def testGetCodeOwnersClient_CodeOwnersEnabled(self):
def testGetCodeOwnersClient_GerritClient(self):
gerrit_util.IsCodeOwnersEnabledOnHost.return_value = True
self.assertIsInstance(
owners_client.GetCodeOwnersClient('host', 'project', 'branch'),
owners_client.GetCodeOwnersClient('', '', 'host', 'project', 'branch'),
owners_client.GerritClient)
def testGetCodeOwnersClient_CodeOwnersDisabled(self):
def testGetCodeOwnersClient_DepotToolsClient(self):
gerrit_util.IsCodeOwnersEnabledOnHost.return_value = False
with self.assertRaises(Exception):
owners_client.GetCodeOwnersClient('', '', '')
self.assertIsInstance(
owners_client.GetCodeOwnersClient('root', 'branch', '', '', ''),
owners_client.DepotToolsClient)
if __name__ == '__main__':
......
#!/usr/bin/env vpython3
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for owners.py."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from testing_support import filesystem_mock
import owners
ben = 'ben@example.com'
brett = 'brett@example.com'
darin = 'darin@example.com'
jochen = 'jochen@example.com'
john = 'john@example.com'
ken = 'ken@example.com'
peter = 'peter@example.com'
tom = 'tom@example.com'
def owners_file(*email_addresses, **kwargs):
s = ''
if kwargs.get('comment'):
s += '# %s\n' % kwargs.get('comment')
if kwargs.get('noparent'):
s += 'set noparent\n'
if kwargs.get('file'):
s += 'file:%s\n' % kwargs.get('file')
if kwargs.get('lines'):
s += '\n'.join(kwargs.get('lines', [])) + '\n'
return s + '\n'.join(email_addresses) + '\n'
def test_repo():
return filesystem_mock.MockFileSystem(files={
'/DEPS' : '',
'/OWNERS': owners_file(owners.EVERYONE),
'/base/vlog.h': '',
'/chrome/OWNERS': owners_file(ben, brett),
'/chrome/browser/OWNERS': owners_file(brett),
'/chrome/browser/defaults.h': '',
'/chrome/gpu/OWNERS': owners_file(ken),
'/chrome/gpu/gpu_channel.h': '',
'/chrome/comment/OWNERS': owners_file(file='//content/comment/OWNERS'),
'/chrome/renderer/OWNERS': owners_file(peter),
'/chrome/renderer/gpu/gpu_channel_host.h': '',
'/chrome/renderer/safe_browsing/scorer.h': '',
'/chrome/tools/OWNERS': owners_file(file='../OWNERS'),
'/content/OWNERS': owners_file(john, darin, comment='foo', noparent=True),
'/content/comment/OWNERS': owners_file(john + ' # for comments',
darin + ' # for everything else'),
'/content/content.gyp': '',
'/content/bar/foo.cc': '',
'/content/baz/OWNERS': owners_file(brett),
'/content/baz/froboz.h': '',
'/content/baz/ugly.cc': '',
'/content/baz/ugly.h': '',
'/content/garply/OWNERS': owners_file(file='test/OWNERS'),
'/content/garply/foo.cc': '',
'/content/garply/test/OWNERS': owners_file(peter),
'/content/qux/OWNERS': owners_file(peter, file='//content/baz/OWNERS'),
'/content/qux/foo.cc': '',
'/content/views/OWNERS': owners_file(ben, john, owners.EVERYONE,
noparent=True),
'/content/views/pie.h': '',
})
class _BaseTestCase(unittest.TestCase):
def setUp(self):
self.repo = test_repo()
self.files = self.repo.files
self.root = '/'
self.fopen = self.repo.open_for_reading
def db(self, root=None, fopen=None, os_path=None):
root = root or self.root
fopen = fopen or self.fopen
os_path = os_path or self.repo
# pylint: disable=no-value-for-parameter
return owners.Database(root, fopen, os_path)
class OwnersDatabaseTest(_BaseTestCase):
def test_constructor(self):
self.assertNotEquals(self.db(), None)
def test_files_not_covered_by__valid_inputs(self):
db = self.db()
# Check that we're passed in a sequence that isn't a string.
self.assertRaises(AssertionError, db.files_not_covered_by, 'foo', [])
if hasattr(owners.collections, 'Iterable'):
self.assertRaises(AssertionError, db.files_not_covered_by,
(f for f in ['x', 'y']), [])
# Check that the files are under the root.
db.root = '/checkout'
self.assertRaises(AssertionError, db.files_not_covered_by,
['/OWNERS'], [])
db.root = '/'
# Check invalid email address.
self.assertRaises(AssertionError, db.files_not_covered_by,
['OWNERS'], ['foo'])
def assert_files_not_covered_by(self, files, reviewers, unreviewed_files):
db = self.db()
self.assertEqual(db.files_not_covered_by(set(files), set(reviewers)),
set(unreviewed_files))
def test_files_not_covered_by__owners_propagates_down(self):
self.assert_files_not_covered_by(
['chrome/gpu/gpu_channel.h', 'chrome/renderer/gpu/gpu_channel_host.h'],
[ben], [])
def test_files_not_covered_by__partial_covering(self):
self.assert_files_not_covered_by(
['content/content.gyp', 'chrome/renderer/gpu/gpu_channel_host.h'],
[peter], ['content/content.gyp'])
def test_files_not_covered_by__set_noparent_works(self):
self.assert_files_not_covered_by(['content/content.gyp'], [ben],
['content/content.gyp'])
def test_files_not_covered_by__no_reviewer(self):
self.assert_files_not_covered_by(
['content/content.gyp', 'chrome/renderer/gpu/gpu_channel_host.h'],
[], ['content/content.gyp'])
def test_files_not_covered_by__combines_directories(self):
self.assert_files_not_covered_by(['content/content.gyp',
'content/bar/foo.cc',
'chrome/renderer/gpu/gpu_channel_host.h'],
[peter],
['content/content.gyp',
'content/bar/foo.cc'])
def test_files_not_covered_by__multiple_directories(self):
self.assert_files_not_covered_by(
['content/content.gyp', # Not covered
'content/bar/foo.cc', # Not covered (combines in)
'content/baz/froboz.h', # Not covered
'chrome/gpu/gpu_channel.h', # Owned by ken
'chrome/renderer/gpu/gpu_channel_host.h' # Owned by * via parent
],
[ken],
['content/content.gyp', 'content/bar/foo.cc', 'content/baz/froboz.h'])
def test_per_file(self):
self.files['/content/baz/OWNERS'] = owners_file(brett,
lines=['per-file ugly.*=tom@example.com'])
# peter isn't allowed to approve ugly.cc
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[peter],
['content/baz/ugly.cc'])
# brett is allowed to approve ugly.cc
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[brett],
[])
# tom is allowed to approve ugly.cc, but not froboz.h
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[tom],
[])
self.assert_files_not_covered_by(['content/baz/froboz.h'],
[tom],
['content/baz/froboz.h'])
def test_per_file_with_spaces(self):
# This is the same as test_per_file(), except that we include spaces
# on the per-file line.
# tom is allowed to approve ugly.cc, but not froboz.h
self.files['/content/baz/OWNERS'] = owners_file(brett,
lines=['per-file ugly.* = tom@example.com'])
# peter isn't allowed to approve ugly.cc
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[peter],
['content/baz/ugly.cc'])
# brett is allowed to approve ugly.cc
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[brett],
[])
# tom is allowed to approve ugly.cc, but not froboz.h
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[tom],
[])
self.assert_files_not_covered_by(['content/baz/froboz.h'],
[tom],
['content/baz/froboz.h'])
def test_per_file_with_nonexistent_file(self):
self.files['/content/baz/OWNERS'] = owners_file(brett,
lines=['per-file ugly.*=tom@example.com'])
# peter isn't allowed to approve ugly.nonexistent.cc, but brett and tom are.
self.assert_files_not_covered_by(['content/baz/ugly.nonexistent.cc'],
[peter],
['content/baz/ugly.nonexistent.cc'])
self.assert_files_not_covered_by(['content/baz/ugly.nonexistent.cc'],
[brett],
[])
self.assert_files_not_covered_by(['content/baz/ugly.nonexistent.cc'],
[tom],
[])
def test_per_file__set_noparent(self):
self.files['/content/baz/OWNERS'] = owners_file(brett,
lines=['per-file ugly.*=tom@example.com',
'per-file ugly.*=set noparent'])
# brett isn't allowed to approve ugly.cc
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[brett],
['content/baz/ugly.cc'])
# tom is allowed to approve ugly.cc, but not froboz.h
self.assert_files_not_covered_by(['content/baz/ugly.cc'],
[tom],
[])
self.assert_files_not_covered_by(['content/baz/froboz.h'],
[tom],
['content/baz/froboz.h'])
def test_per_file_wildcard(self):
self.files['/OWNERS'] = 'per-file DEPS=*\n'
self.assert_files_not_covered_by(['DEPS'], [brett], [])
def test_mock_relpath(self):
# This test ensures the mock relpath has the arguments in the right
# order; this should probably live someplace else.
self.assertEqual(self.repo.relpath('foo/bar.c', 'foo/'), 'bar.c')
self.assertEqual(self.repo.relpath('/bar.c', '/'), 'bar.c')
def test_per_file_glob_across_dirs_not_allowed(self):
self.files['/OWNERS'] = 'per-file content/*=john@example.org\n'
self.assertRaises(owners.SyntaxErrorInOwnersFile,
self.db().files_not_covered_by, ['DEPS'], [brett])
def test_file_include_absolute_path(self):
self.assert_files_not_covered_by(['content/qux/foo.cc'], [brett], [])
self.assert_files_not_covered_by(['content/qux/bar.cc'], [peter], [])
self.assert_files_not_covered_by(['content/qux/baz.cc'],
[tom], ['content/qux/baz.cc'])
def test_file_include_relative_path(self):
self.assert_files_not_covered_by(['content/garply/foo.cc'], [peter], [])
self.assert_files_not_covered_by(['content/garply/bar.cc'], [darin], [])
self.assert_files_not_covered_by(['content/garply/baz.cc'],
[tom], ['content/garply/baz.cc'])
def test_file_include_relative_path_non_empty_root(self):
old_root = self.root
self.root = '/content'
self.assert_files_not_covered_by(['garply/foo.cc'], [peter], [])
self.assert_files_not_covered_by(['garply/bar.cc'], [darin], [])
self.assert_files_not_covered_by(['garply/baz.cc'],
[tom], ['garply/baz.cc'])
self.root = old_root
def test_file_include_per_file_absolute_path(self):
self.files['/content/qux/OWNERS'] = owners_file(peter,
lines=['per-file foo.*=file://content/baz/OWNERS'])
self.assert_files_not_covered_by(['content/qux/foo.cc'], [brett], [])
self.assert_files_not_covered_by(['content/qux/baz.cc'],
[brett], ['content/qux/baz.cc'])
def test_file_include_per_file_relative_path(self):
self.files['/content/garply/OWNERS'] = owners_file(brett,
lines=['per-file foo.*=file:test/OWNERS'])
self.assert_files_not_covered_by(['content/garply/foo.cc'], [peter], [])
self.assert_files_not_covered_by(['content/garply/baz.cc'],
[peter], ['content/garply/baz.cc'])
def test_file_include_recursive(self):
self.files['/content/baz/OWNERS'] = owners_file(file='//chrome/gpu/OWNERS')
self.assert_files_not_covered_by(['content/qux/foo.cc'], [ken], [])
def test_file_include_different_filename(self):
# This tests that a file named something other than OWNERS is not treated
# like OWNERS; we want to make sure that ken and peter don't become owners
# for /content, and that other owners for content still work.
self.files['/content/baz/OWNERS'] = owners_file(file='//content/BAZ_OWNERS')
self.files['/content/BAZ_OWNERS'] = owners_file([ken, peter])
self.assert_files_not_covered_by(
['content/baz/baz.cc', 'content/qux/foo.cc'],
[ken], ['content/qux/foo.cc'])
self.assert_files_not_covered_by(
['content/baz/baz.cc', 'content/qux/foo.cc'],
[ken, john], [])
def test_file_include_recursive_loop(self):
self.files['/content/baz/OWNERS'] = owners_file(brett,
file='//content/qux/OWNERS')
self.test_file_include_absolute_path()
def test_file_include_different_filename(self):
self.files['/owners/GARPLY_OWNERS'] = owners_file(peter)
self.files['/content/garply/OWNERS'] = owners_file(john,
lines=['per-file foo.*=file://owners/GARPLY_OWNERS'])
self.assert_files_not_covered_by(['content/garply/foo.cc'], [peter], [])
def test_file_include_invalid_filename(self):
self.files['/base/SECURITY_REVIEWERS'] = owners_file(peter)
self.files['/ipc/OWNERS'] = owners_file(file='//base/SECURITY_REVIEWERS')
try:
self.db().reviewers_for(['ipc/ipc_message_utils.h'], None)
self.fail() # pragma: no cover
except owners.SyntaxErrorInOwnersFile as e:
self.assertTrue(str(e).startswith('/ipc/OWNERS:1'))
def test_file_include_with_comment(self):
# See crbug.com/995474 for context.
self.assert_files_not_covered_by(['chrome/comment/comment.cc'], [darin], [])
def assert_syntax_error(self, owners_file_contents):
db = self.db()
self.files['/foo/OWNERS'] = owners_file_contents
self.files['/foo/DEPS'] = ''
try:
db.reviewers_for(['foo/DEPS'], None)
self.fail() # pragma: no cover
except owners.SyntaxErrorInOwnersFile as e:
self.assertTrue(str(e).startswith('/foo/OWNERS:1'))
def test_syntax_error__unknown_token(self):
self.assert_syntax_error('{}\n')
def test_syntax_error__unknown_set(self):
self.assert_syntax_error('set myfatherisbillgates\n')
def test_syntax_error__bad_email(self):
self.assert_syntax_error('ben\n')
def test_syntax_error__invalid_absolute_file(self):
self.assert_syntax_error('file://foo/bar/OWNERS\n')
def test_syntax_error__invalid_relative_file(self):
self.assert_syntax_error('file:foo/bar/OWNERS\n')
def test_non_existant_status_file(self):
db = self.db()
self.files['/OWNERS'] = owners_file(brett,
comment='OWNERS_STATUS = nonexistant')
self.files['/foo/DEPS'] = ''
self.assertRaises(IOError, db.reviewers_for, ['foo/DEPS'], None)
def test_comment_to_owners_mapping(self):
db = self.db()
self.files['/OWNERS'] = '\n'.join([
'# first comment',
ben,
brett + ' # inline comment',
'',
darin,
'',
'# comment preceded by empty line',
'per-file bar.*=%s' % jochen,
john,
'',
ken,
'# comment in the middle',
peter,
tom])
# Force loading of the OWNERS file.
self.files['/bar.cc'] = ''
db.reviewers_for(['bar.cc'], None)
self.assertEqual(db.comments, {
ben: {'': 'first comment'},
brett: {'': 'first comment inline comment'},
jochen: {'bar.*': 'comment preceded by empty line'},
john: {'': 'comment preceded by empty line'},
peter: {'': 'comment in the middle'}})
def test_owners_rooted_at_file(self):
self.files['/foo/OWNERS'] = owners_file(darin, file='//bar/OWNERS')
self.files['/bar/OWNERS'] = owners_file(john,
lines=['per-file nope.cc=' + ben])
db = self.db()
self.assertEqual(db.owners_rooted_at_file('foo/OWNERS'),
set([john, darin]))
class ReviewersForTest(_BaseTestCase):
def assert_reviewers_for(self, files, potential_suggested_reviewers,
author=None, override_files=None):
db = self.db()
db.override_files = override_files or {}
suggested_reviewers = db.reviewers_for(set(files), author)
self.assertTrue(suggested_reviewers in
[set(suggestion) for suggestion in potential_suggested_reviewers])
def test_reviewers_for__basic_functionality(self):
self.assert_reviewers_for(['chrome/gpu/gpu_channel.h'],
[[ken]])
def test_reviewers_for__set_noparent_works(self):
self.assert_reviewers_for(['content/content.gyp'],
[[john],
[darin]])
def test_reviewers_for__relative_owners_file(self):
self.assert_reviewers_for(['chrome/tools/OWNERS'],
[[ben], [brett]])
def test_reviewers_for__valid_inputs(self):
db = self.db()
# Check that we're passed in a sequence that isn't a string.
self.assertRaises(AssertionError, db.reviewers_for, 'foo', None)
if hasattr(owners.collections, 'Iterable'):
self.assertRaises(AssertionError, db.reviewers_for,
(f for f in ['x', 'y']), None)
# Check that the files are under the root.
db.root = '/checkout'
self.assertRaises(AssertionError, db.reviewers_for, ['/OWNERS'], None)
def test_reviewers_for__wildcard_dir(self):
self.assert_reviewers_for(['DEPS'], [['<anyone>']])
self.assert_reviewers_for(['DEPS', 'chrome/gpu/gpu_channel.h'], [[ken]])
def test_reviewers_for__one_owner(self):
self.assert_reviewers_for([
'chrome/gpu/gpu_channel.h',
'content/baz/froboz.h',
'chrome/renderer/gpu/gpu_channel_host.h'],
[[brett]])
def test_reviewers_for__two_owners(self):
self.assert_reviewers_for([
'chrome/gpu/gpu_channel.h',
'content/content.gyp',
'content/baz/froboz.h',
'content/views/pie.h'],
[[ken, john]])
def test_reviewers_for__all_files(self):
self.assert_reviewers_for([
'chrome/gpu/gpu_channel.h',
'chrome/renderer/gpu/gpu_channel_host.h',
'chrome/renderer/safe_browsing/scorer.h',
'content/content.gyp',
'content/bar/foo.cc',
'content/baz/froboz.h',
'content/views/pie.h'],
[[peter, ken, john]])
def test_reviewers_for__per_file_owners_file(self):
self.files['/content/baz/OWNERS'] = owners_file(lines=[
'per-file ugly.*=tom@example.com'])
self.assert_reviewers_for(['content/baz/OWNERS'],
[[john],
[darin]])
def test_reviewers_for__per_file(self):
self.files['/content/baz/OWNERS'] = owners_file(lines=[
'per-file ugly.*=tom@example.com'])
self.assert_reviewers_for(['content/baz/ugly.cc'],
[[tom]])
def test_reviewers_for__two_nested_dirs(self):
# The same owner is listed in two directories (one above the other)
self.assert_reviewers_for(['chrome/browser/defaults.h'],
[[brett]])
# Here, although either ben or brett could review both files,
# someone closer to the gpu_channel_host.h should also be suggested.
# This also tests that we can handle two suggested reviewers
# with overlapping sets of directories properly.
self.files['/chrome/renderer/gpu/OWNERS'] = owners_file(ken)
self.assert_reviewers_for(['chrome/OWNERS',
'chrome/renderer/gpu/gpu_channel_host.h'],
[[ben, ken],
[brett, ken]])
def test_reviewers_for__author_is_known(self):
# We should never suggest ken as a reviewer for his own changes.
self.assert_reviewers_for(['chrome/gpu/gpu_channel.h'],
[[ben], [brett]], author=ken)
def test_reviewers_for__ignores_unowned_files(self):
# Clear the root OWNERS file.
self.files['/OWNERS'] = ''
self.assert_reviewers_for(['base/vlog.h', 'chrome/browser/defaults/h'],
[[brett]])
def test_reviewers_file_includes__absolute(self):
self.assert_reviewers_for(['content/qux/foo.cc'],
[[peter], [brett], [john], [darin]])
def test_reviewers_file_includes__relative(self):
self.assert_reviewers_for(['content/garply/foo.cc'],
[[peter], [john], [darin]])
def test_reviewers_file_includes__per_file(self):
self.files['/content/garply/OWNERS'] = owners_file(brett,
lines=['per-file foo.*=file:test/OWNERS'])
self.assert_reviewers_for(['content/garply/foo.cc'],
[[brett], [peter]])
self.assert_reviewers_for(['content/garply/bar.cc'],
[[brett]])
def test_reviewers_file_includes__per_file_noparent(self):
self.files['/content/garply/OWNERS'] = owners_file(brett,
lines=['per-file foo.*=set noparent',
'per-file foo.*=file:test/OWNERS'])
self.assert_reviewers_for(['content/garply/foo.cc'],
[[peter]])
self.assert_reviewers_for(['content/garply/bar.cc'],
[[brett]])
def test_override_files(self):
self.assert_reviewers_for(['content/baz/froboz.h'], [[jochen]],
override_files={'content/baz/OWNERS': [jochen]})
self.assert_reviewers_for(['content/baz/froboz.h'], [[john],[darin]],
override_files={'content/baz/OWNERS': []})
self.assert_reviewers_for(
['content/baz/froboz.h'], [[jochen]],
override_files={'content/baz/OWNERS': ['file://JOCHEN_OWNERS'],
'JOCHEN_OWNERS': [jochen]})
class LowestCostOwnersTest(_BaseTestCase):
# Keep the data in the test_lowest_cost_owner* methods as consistent with
# test_repo() where possible to minimize confusion.
def check(self, possible_owners, dirs, *possible_lowest_cost_owners):
suggested_owner = owners.Database.lowest_cost_owner(possible_owners, dirs)
self.assertTrue(suggested_owner in possible_lowest_cost_owners)
def test_one_dir_with_owner(self):
# brett is the only immediate owner for stuff in baz; john is also
# an owner, but further removed. We should always get brett.
self.check({brett: [('content/baz', 1)],
john: [('content/baz', 2)]},
['content/baz'],
brett)
# john and darin are owners for content; the suggestion could be either.
def test_one_dir_with_two_owners(self):
self.check({john: [('content', 1)],
darin: [('content', 1)]},
['content'],
john, darin)
def test_one_dir_with_two_owners_in_parent(self):
# As long as the distance is the same, it shouldn't matter (brett isn't
# listed in this case).
self.check({john: [('content/baz', 2)],
darin: [('content/baz', 2)]},
['content/baz'],
john, darin)
def test_two_dirs_two_owners(self):
# If they both match both dirs, they should be treated equally.
self.check({john: [('content/baz', 2), ('content/bar', 2)],
darin: [('content/baz', 2), ('content/bar', 2)]},
['content/baz', 'content/bar'],
john, darin)
# Here brett is better since he's closer for one of the two dirs.
self.check({brett: [('content/baz', 1), ('content/views', 1)],
darin: [('content/baz', 2), ('content/views', 1)]},
['content/baz', 'content/views'],
brett)
def test_hierarchy(self):
# the choices in these tests are more arbitrary value judgements;
# also, here we drift away from test_repo() to cover more cases.
# Here ben isn't picked, even though he can review both; we prefer
# closer reviewers.
self.check({ben: [('chrome/gpu', 2), ('chrome/renderer', 2)],
ken: [('chrome/gpu', 1)],
peter: [('chrome/renderer', 1)]},
['chrome/gpu', 'chrome/renderer'],
ken, peter)
# Here we always pick ben since he can review either dir as well as
# the others but can review both (giving us fewer total reviewers).
self.check({ben: [('chrome/gpu', 1), ('chrome/renderer', 1)],
ken: [('chrome/gpu', 1)],
peter: [('chrome/renderer', 1)]},
['chrome/gpu', 'chrome/renderer'],
ben)
# However, three reviewers is too many, so ben gets this one.
self.check({ben: [('chrome/gpu', 2), ('chrome/renderer', 2),
('chrome/browser', 2)],
ken: [('chrome/gpu', 1)],
peter: [('chrome/renderer', 1)],
brett: [('chrome/browser', 1)]},
['chrome/gpu', 'chrome/renderer',
'chrome/browser'],
ben)
if __name__ == '__main__':
unittest.main()
......@@ -43,7 +43,9 @@ import gclient_utils
import git_cl
import git_common as git
import json
import owners
import owners_client
import owners_finder
import presubmit_support as presubmit
import rdb_wrapper
import scm
......@@ -2565,10 +2567,26 @@ the current line as well!
def GetInputApiWithOWNERS(self, owners_content, code_owners_enabled=False):
input_api = self.GetInputApiWithFiles({'OWNERS': ('M', owners_content)})
owners_file = StringIO(owners_content)
fopen = lambda *args: owners_file
input_api.owners_db = owners.Database('', fopen, os.path)
input_api.gerrit.IsCodeOwnersEnabledOnRepo = lambda: code_owners_enabled
return input_api
def testCheckOwnersFormatWorks(self):
input_api = self.GetInputApiWithOWNERS('\n'.join([
'set noparent',
'per-file lalala = lemur@chromium.org',
]))
self.assertEqual(
[],
presubmit_canned_checks.CheckOwnersFormat(
input_api, presubmit.OutputApi)
)
def testCheckOwnersFormatWorks_CodeOwners(self):
# If code owners is enabled, we rely on it to check owners format instead of
# depot tools.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment