owners.py 18.6 KB
Newer Older
1
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 3 4
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

5 6 7 8 9 10 11 12 13
"""A database of OWNERS files.

OWNERS files indicate who is allowed to approve changes in a specific directory
(or who is allowed to make changes without needing approval of another OWNER).
Note that all changes must still be reviewed by someone familiar with the code,
so you may need approval from both an OWNER and a reviewer in many cases.

The syntax of the OWNERS file is, roughly:

14
lines      := (\s* line? \s* comment? \s* "\n")*
15

16 17
line       := directive
           | "per-file" \s+ glob \s* "=" \s* directive
18

19 20 21 22
directive  := "set noparent"
           |  "file:" owner_file
           |  email_address
           |  "*"
23

24
glob       := [a-zA-Z0-9_-*?]+
25

26 27 28 29
comment    := "#" [^"\n"]*

owner_file := "OWNERS"
           |  [^"\n"]* "_OWNERS"
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

Email addresses must follow the foo@bar.com short form (exact syntax given
in BASIC_EMAIL_REGEXP, below). Filename globs follow the simple unix
shell conventions, and relative and absolute paths are not allowed (i.e.,
globs only refer to the files in the current directory).

If a user's email is one of the email_addresses in the file, the user is
considered an "OWNER" for all files in the directory.

If the "per-file" directive is used, the line only applies to files in that
directory that match the filename glob specified.

If the "set noparent" directive used, then only entries in this OWNERS file
apply to files in this directory; if the "set noparent" directive is not
used, then entries in OWNERS files in enclosing (upper) directories also
apply (up until a "set noparent is encountered").

If "per-file glob=set noparent" is used, then global directives are ignored
for the glob, and only the "per-file" owners are used for files matching that
glob.

51 52 53
If the "file:" directive is used, the referred to OWNERS file will be parsed and
considered when determining the valid set of OWNERS. If the filename starts with
"//" it is relative to the root of the repository, otherwise it is relative to
54 55
the current file. The referred to file *must* be named OWNERS or end in a suffix
of _OWNERS.
56

57 58
Examples for all of these combinations can be found in tests/owners_unittest.py.
"""
59

60
import collections
61
import fnmatch
62
import random
63 64 65 66 67 68 69 70 71
import re


# If this is present by itself on a line, this means that everyone can review.
EVERYONE = '*'


# Recognizes 'X@Y' email addresses. Very simplistic.
BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
72

73

74 75 76 77 78
# Key for global comments per email address. Should be unlikely to be a
# pathname.
GLOBAL_STATUS = '*'


79
def _assert_is_collection(obj):
80
  assert not isinstance(obj, basestring)
81
  # Module 'collections' has no 'Iterable' member
82
  # pylint: disable=no-member
83 84 85
  if hasattr(collections, 'Iterable') and hasattr(collections, 'Sized'):
    assert (isinstance(obj, collections.Iterable) and
            isinstance(obj, collections.Sized))
86 87


88
class SyntaxErrorInOwnersFile(Exception):
89 90
  def __init__(self, path, lineno, msg):
    super(SyntaxErrorInOwnersFile, self).__init__((path, lineno, msg))
91
    self.path = path
92
    self.lineno = lineno
93 94 95
    self.msg = msg

  def __str__(self):
96
    return '%s:%d syntax error: %s' % (self.path, self.lineno, self.msg)
97 98


99
class Database(object):
100
  """A database of OWNERS files for a repository.
101

102 103 104 105
  This class allows you to find a suggested set of reviewers for a list
  of changed files, and see if a list of changed files is covered by a
  list of reviewers."""

106
  def __init__(self, root, fopen, os_path):
107
    """Args:
108 109
      root: the path to the root of the Repository
      open: function callback to open a text file for reading
110
      os_path: module/object callback with fields for 'abspath', 'dirname',
111
          'exists', 'join', and 'relpath'
112 113 114 115 116
    """
    self.root = root
    self.fopen = fopen
    self.os_path = os_path

117
    # Pick a default email regexp to use; callers can override as desired.
118
    self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
119

120 121 122 123 124
    # Replacement contents for the given files. Maps the file name of an
    # OWNERS file (relative to root) to an iterator returning the replacement
    # file contents.
    self.override_files = {}

125 126
    # Mapping of owners to the paths or globs they own.
    self._owners_to_paths = {EVERYONE: set()}
127

128
    # Mapping of paths to authorized owners.
129
    self._paths_to_owners = {}
130

131 132 133
    # Mapping reviewers to the preceding comment per file in the OWNERS files.
    self.comments = {}

134 135 136
    # Cache of compiled regexes for _fnmatch()
    self._fnmatch_cache = {}

137 138
    # Set of paths that stop us from looking above them for owners.
    # (This is implicitly true for the root directory).
139
    self._stop_looking = set([''])
140

141 142 143
    # Set of files which have already been read.
    self.read_files = set()

144 145 146 147 148
    # Set of files which were included from other files. Files are processed
    # differently depending on whether they are regular owners files or
    # being included from another file.
    self._included_files = {}

149 150 151
    # File with global status lines for owners.
    self._status_file = None

152
  def reviewers_for(self, files, author):
153
    """Returns a suggested set of reviewers that will cover the files.
154

155 156 157
    files is a sequence of paths relative to (and under) self.root.
    If author is nonempty, we ensure it is not included in the set returned
    in order avoid suggesting the author as a reviewer for their own changes."""
158
    self._check_paths(files)
159
    self.load_data_needed_for(files)
160

161
    suggested_owners = self._covering_set_of_owners_for(files, author)
162 163 164 165 166 167
    if EVERYONE in suggested_owners:
      if len(suggested_owners) > 1:
        suggested_owners.remove(EVERYONE)
      else:
        suggested_owners = set(['<anyone>'])
    return suggested_owners
168

169 170
  def files_not_covered_by(self, files, reviewers):
    """Returns the files not owned by one of the reviewers.
171 172 173

    Args:
        files is a sequence of paths relative to (and under) self.root.
174 175
        reviewers is a sequence of strings matching self.email_regexp.
    """
176 177
    self._check_paths(files)
    self._check_reviewers(reviewers)
178
    self.load_data_needed_for(files)
179

180
    return set(f for f in files if not self._is_obj_covered_by(f, reviewers))
181

182 183
  def _check_paths(self, files):
    def _is_under(f, pfx):
184
      return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
185
    _assert_is_collection(files)
186 187
    assert all(not self.os_path.isabs(f) and
                _is_under(f, self.os_path.abspath(self.root)) for f in files)
188

189
  def _check_reviewers(self, reviewers):
190
    _assert_is_collection(reviewers)
191
    assert all(self.email_regexp.match(r) for r in reviewers), reviewers
192

193 194 195 196 197 198 199 200 201
  def _is_obj_covered_by(self, objname, reviewers):
    reviewers = list(reviewers) + [EVERYONE]
    while True:
      for reviewer in reviewers:
        for owned_pattern in self._owners_to_paths.get(reviewer, set()):
          if fnmatch.fnmatch(objname, owned_pattern):
            return True
      if self._should_stop_looking(objname):
        break
202
      objname = self.os_path.dirname(objname)
203
    return False
204

Francois Doray's avatar
Francois Doray committed
205
  def enclosing_dir_with_owners(self, objname):
206
    """Returns the innermost enclosing directory that has an OWNERS file."""
207
    dirpath = objname
208 209
    while not self._owners_for(dirpath):
      if self._should_stop_looking(dirpath):
210 211 212 213
        break
      dirpath = self.os_path.dirname(dirpath)
    return dirpath

214
  def load_data_needed_for(self, files):
215
    self._read_global_comments()
216 217
    for f in files:
      dirpath = self.os_path.dirname(f)
218
      while not self._owners_for(dirpath):
219
        self._read_owners(self.os_path.join(dirpath, 'OWNERS'))
220
        if self._should_stop_looking(dirpath):
221 222 223
          break
        dirpath = self.os_path.dirname(dirpath)

224
  def _should_stop_looking(self, objname):
225
    return any(self._fnmatch(objname, stop_looking)
226 227 228 229 230
               for stop_looking in self._stop_looking)

  def _owners_for(self, objname):
    obj_owners = set()
    for owned_path, path_owners in self._paths_to_owners.iteritems():
231
      if self._fnmatch(objname, owned_path):
232 233 234
        obj_owners |= path_owners
    return obj_owners

235 236
  def _read_owners(self, path):
    owners_path = self.os_path.join(self.root, path)
237
    if not (self.os_path.exists(owners_path) or (path in self.override_files)):
238
      return
239 240 241 242 243 244

    if owners_path in self.read_files:
      return

    self.read_files.add(owners_path)

245 246
    is_toplevel = path == 'OWNERS'

247
    comment = []
248
    dirpath = self.os_path.dirname(path)
249
    in_comment = False
250 251 252
    # We treat the beginning of the file as an blank line.
    previous_line_was_blank = True
    reset_comment_after_use = False
253
    lineno = 0
254 255 256 257 258 259 260

    if path in self.override_files:
      file_iter = self.override_files[path]
    else:
      file_iter = self.fopen(owners_path)

    for line in file_iter:
261 262
      lineno += 1
      line = line.strip()
263
      if line.startswith('#'):
264 265 266 267 268
        if is_toplevel:
          m = re.match('#\s*OWNERS_STATUS\s+=\s+(.+)$', line)
          if m:
            self._status_file = m.group(1).strip()
            continue
269 270
        if not in_comment:
          comment = []
271
          reset_comment_after_use = not previous_line_was_blank
272 273
        comment.append(line[1:].strip())
        in_comment = True
274
        continue
275 276
      in_comment = False

277
      if line == '':
278 279
        comment = []
        previous_line_was_blank = True
280 281
        continue

282 283 284 285 286 287
      # If the line ends with a comment, strip the comment and store it for this
      # line only.
      line, _, line_comment = line.partition('#')
      line = line.strip()
      line_comment = [line_comment.strip()] if line_comment else []

288
      previous_line_was_blank = False
289
      if line == 'set noparent':
290
        self._stop_looking.add(dirpath)
291
        continue
292

293
      m = re.match('per-file (.+)=(.+)', line)
294
      if m:
295 296
        glob_string = m.group(1).strip()
        directive = m.group(2).strip()
297
        full_glob_string = self.os_path.join(self.root, dirpath, glob_string)
298
        if '/' in glob_string or '\\' in glob_string:
299
          raise SyntaxErrorInOwnersFile(owners_path, lineno,
300 301
              'per-file globs cannot span directories or use escapes: "%s"' %
              line)
302
        relative_glob_string = self.os_path.relpath(full_glob_string, self.root)
303
        self._add_entry(relative_glob_string, directive, owners_path,
304
                        lineno, '\n'.join(comment + line_comment))
305 306
        if reset_comment_after_use:
          comment = []
307 308
        continue

309 310 311
      if line.startswith('set '):
        raise SyntaxErrorInOwnersFile(owners_path, lineno,
            'unknown option: "%s"' % line[4:].strip())
312

313
      self._add_entry(dirpath, line, owners_path, lineno,
314
                      ' '.join(comment + line_comment))
315 316
      if reset_comment_after_use:
        comment = []
317

318
  def _read_global_comments(self):
319 320 321 322 323
    if not self._status_file:
      if not 'OWNERS' in self.read_files:
        self._read_owners('OWNERS')
      if not self._status_file:
        return
324

325
    owners_status_path = self.os_path.join(self.root, self._status_file)
326
    if not self.os_path.exists(owners_status_path):
327
      raise IOError('Could not find global status file "%s"' %
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
                    owners_status_path)

    if owners_status_path in self.read_files:
      return

    self.read_files.add(owners_status_path)

    lineno = 0
    for line in self.fopen(owners_status_path):
      lineno += 1
      line = line.strip()
      if line.startswith('#'):
        continue
      if line == '':
        continue

      m = re.match('(.+?):(.+)', line)
      if m:
        owner = m.group(1).strip()
        comment = m.group(2).strip()
        if not self.email_regexp.match(owner):
          raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
              'invalid email address: "%s"' % owner)

        self.comments.setdefault(owner, {})
        self.comments[owner][GLOBAL_STATUS] = comment
        continue

      raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
          'cannot parse status entry: "%s"' % line.strip())

359
  def _add_entry(self, owned_paths, directive, owners_path, lineno, comment):
360
    if directive == 'set noparent':
361
      self._stop_looking.add(owned_paths)
362
    elif directive.startswith('file:'):
363
      include_file = self._resolve_include(directive[5:], owners_path, lineno)
364
      if not include_file:
365 366 367
        raise SyntaxErrorInOwnersFile(owners_path, lineno,
            ('%s does not refer to an existing file.' % directive[5:]))

368 369 370 371
      included_owners = self._read_just_the_owners(include_file)
      for owner in included_owners:
        self._owners_to_paths.setdefault(owner, set()).add(owned_paths)
        self._paths_to_owners.setdefault(owned_paths, set()).add(owner)
372
    elif self.email_regexp.match(directive) or directive == EVERYONE:
373 374 375
      if comment:
        self.comments.setdefault(directive, {})
        self.comments[directive][owned_paths] = comment
376 377
      self._owners_to_paths.setdefault(directive, set()).add(owned_paths)
      self._paths_to_owners.setdefault(owned_paths, set()).add(directive)
378
    else:
379
      raise SyntaxErrorInOwnersFile(owners_path, lineno,
380 381
          ('"%s" is not a "set noparent", file include, "*", '
           'or an email address.' % (directive,)))
382

383
  def _resolve_include(self, path, start, lineno):
384 385 386 387
    if path.startswith('//'):
      include_path = path[2:]
    else:
      assert start.startswith(self.root)
388
      start = self.os_path.dirname(self.os_path.relpath(start, self.root))
389 390
      include_path = self.os_path.join(start, path)

391 392 393
    if include_path in self.override_files:
      return include_path

394
    owners_path = self.os_path.join(self.root, include_path)
395 396 397 398 399 400 401
    # Paths included via "file:" must end in OWNERS or _OWNERS. Files that can
    # affect ownership have a different set of ownership rules, so that users
    # cannot self-approve changes adding themselves to an OWNERS file.
    if not (owners_path.endswith('/OWNERS') or owners_path.endswith('_OWNERS')):
      raise SyntaxErrorInOwnersFile(start, lineno, 'file: include must specify '
                                    'a file named OWNERS or ending in _OWNERS')

402 403 404 405 406
    if not self.os_path.exists(owners_path):
      return None

    return include_path

407 408 409 410 411 412 413
  def _read_just_the_owners(self, include_file):
    if include_file in self._included_files:
      return self._included_files[include_file]

    owners = set()
    self._included_files[include_file] = owners
    lineno = 0
414 415 416 417 418
    if include_file in self.override_files:
      file_iter = self.override_files[include_file]
    else:
      file_iter = self.fopen(self.os_path.join(self.root, include_file))
    for line in file_iter:
419 420 421 422 423 424 425 426 427 428 429
      lineno += 1
      line = line.strip()
      if (line.startswith('#') or line == '' or
              line.startswith('set noparent') or
              line.startswith('per-file')):
        continue

      if self.email_regexp.match(line) or line == EVERYONE:
        owners.add(line)
        continue
      if line.startswith('file:'):
430
        sub_include_file = self._resolve_include(line[5:], include_file, lineno)
431 432 433 434 435 436 437 438 439
        sub_owners = self._read_just_the_owners(sub_include_file)
        owners.update(sub_owners)
        continue

      raise SyntaxErrorInOwnersFile(include_file, lineno,
          ('"%s" is not a "set noparent", file include, "*", '
           'or an email address.' % (line,)))
    return owners

440
  def _covering_set_of_owners_for(self, files, author):
Francois Doray's avatar
Francois Doray committed
441
    dirs_remaining = set(self.enclosing_dir_with_owners(f) for f in files)
442
    all_possible_owners = self.all_possible_owners(dirs_remaining, author)
443
    suggested_owners = set()
444
    while dirs_remaining and all_possible_owners:
445 446 447 448
      owner = self.lowest_cost_owner(all_possible_owners, dirs_remaining)
      suggested_owners.add(owner)
      dirs_to_remove = set(el[0] for el in all_possible_owners[owner])
      dirs_remaining -= dirs_to_remove
449 450 451 452 453 454 455 456 457
      # Now that we've used `owner` and covered all their dirs, remove them
      # from consideration.
      del all_possible_owners[owner]
      for o, dirs in all_possible_owners.items():
        new_dirs = [(d, dist) for (d, dist) in dirs if d not in dirs_to_remove]
        if not new_dirs:
          del all_possible_owners[o]
        else:
          all_possible_owners[o] = new_dirs
458 459
    return suggested_owners

460
  def all_possible_owners(self, dirs, author):
461 462 463 464 465
    """Returns a dict of {potential owner: (dir, distance)} mappings.

    A distance of 1 is the lowest/closest possible distance (which makes the
    subsequent math easier).
    """
466
    all_possible_owners = {}
467 468
    for current_dir in dirs:
      dirname = current_dir
469 470
      distance = 1
      while True:
471
        for owner in self._owners_for(dirname):
472 473
          if author and owner == author:
            continue
474 475 476 477 478
          all_possible_owners.setdefault(owner, [])
          # If the same person is in multiple OWNERS files above a given
          # directory, only count the closest one.
          if not any(current_dir == el[0] for el in all_possible_owners[owner]):
            all_possible_owners[owner].append((current_dir, distance))
479
        if self._should_stop_looking(dirname):
480 481
          break
        dirname = self.os_path.dirname(dirname)
482 483 484
        distance += 1
    return all_possible_owners

485 486 487 488 489 490 491 492
  def _fnmatch(self, filename, pattern):
    """Same as fnmatch.fnmatch(), but interally caches the compiled regexes."""
    matcher = self._fnmatch_cache.get(pattern)
    if matcher is None:
      matcher = re.compile(fnmatch.translate(pattern)).match
      self._fnmatch_cache[pattern] = matcher
    return matcher(filename)

493
  @staticmethod
494
  def total_costs_by_owner(all_possible_owners, dirs):
495 496 497 498 499
    # We want to minimize both the number of reviewers and the distance
    # from the files/dirs needing reviews. The "pow(X, 1.75)" below is
    # an arbitrarily-selected scaling factor that seems to work well - it
    # will select one reviewer in the parent directory over three reviewers
    # in subdirs, but not one reviewer over just two.
500
    result = {}
501 502 503 504 505 506 507
    for owner in all_possible_owners:
      total_distance = 0
      num_directories_owned = 0
      for dirname, distance in all_possible_owners[owner]:
        if dirname in dirs:
          total_distance += distance
          num_directories_owned += 1
508 509 510 511
      if num_directories_owned:
        result[owner] = (total_distance /
                         pow(num_directories_owned, 1.75))
    return result
512

513 514 515 516
  @staticmethod
  def lowest_cost_owner(all_possible_owners, dirs):
    total_costs_by_owner = Database.total_costs_by_owner(all_possible_owners,
                                                         dirs)
517 518 519
    # Return the lowest cost owner. In the case of a tie, pick one randomly.
    lowest_cost = min(total_costs_by_owner.itervalues())
    lowest_cost_owners = filter(
520 521
        lambda owner: total_costs_by_owner[owner] == lowest_cost,
        total_costs_by_owner)
522
    return random.Random().choice(lowest_cost_owners)