watchlists.py 4.28 KB
Newer Older
1 2
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Watchlists

Watchlists is a mechanism that allow a developer (a "watcher") to watch over
portions of code that he is interested in. A "watcher" will be cc-ed to
changes that modify that portion of code, thereby giving him an opportunity
to make comments on codereview.chromium.org even before the change is
committed.
Refer: http://dev.chromium.org/developers/contributing-code/watchlists

When invoked directly from the base of a repository, this script lists out
the watchers for files given on the command line. This is useful to verify
changes to WATCHLISTS files.
"""

20 21
from __future__ import print_function

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
import logging
import os
import re
import sys


class Watchlists(object):
  """Manage Watchlists.

  This class provides mechanism to load watchlists for a repo and identify
  watchers.
  Usage:
    wl = Watchlists("/path/to/repo/root")
    watchers = wl.GetWatchersForPaths(["/path/to/file1",
                                       "/path/to/file2",])
  """

  _RULES = "WATCHLISTS"
  _RULES_FILENAME = _RULES
  _repo_root = None
  _defns = {}       # Definitions
43
  _path_regexps = {}  # Name -> Regular expression mapping
44 45 46 47 48 49 50
  _watchlists = {}  # name to email mapping

  def __init__(self, repo_root):
    self._repo_root = repo_root
    self._LoadWatchlistRules()

  def _GetRulesFilePath(self):
51
    """Returns path to WATCHLISTS file."""
52 53 54 55 56 57
    return os.path.join(self._repo_root, self._RULES_FILENAME)

  def _HasWatchlistsFile(self):
    """Determine if watchlists are available for this repo."""
    return os.path.exists(self._GetRulesFilePath())

58 59 60 61 62 63 64
  def _ContentsOfWatchlistsFile(self):
    """Read the WATCHLISTS file and return its contents."""
    try:
      watchlists_file = open(self._GetRulesFilePath())
      contents = watchlists_file.read()
      watchlists_file.close()
      return contents
65
    except IOError as e:
66 67 68
      logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e))
      return ''

69
  def _LoadWatchlistRules(self):
70
    """Load watchlists from WATCHLISTS file. Does nothing if not present."""
71 72 73
    if not self._HasWatchlistsFile():
      return

74
    contents = self._ContentsOfWatchlistsFile()
75 76 77
    watchlists_data = None
    try:
      watchlists_data = eval(contents, {'__builtins__': None}, None)
78
    except SyntaxError as e:
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
      logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e))
      return

    defns = watchlists_data.get("WATCHLIST_DEFINITIONS")
    if not defns:
      logging.error("WATCHLIST_DEFINITIONS not defined in %s" %
                    self._GetRulesFilePath())
      return
    watchlists = watchlists_data.get("WATCHLISTS")
    if not watchlists:
      logging.error("WATCHLISTS not defined in %s" % self._GetRulesFilePath())
      return
    self._defns = defns
    self._watchlists = watchlists

94 95 96 97 98 99 100 101 102
    # Compile the regular expressions ahead of time to avoid creating them
    # on-the-fly multiple times per file.
    self._path_regexps = {}
    for name, rule in defns.iteritems():
      filepath = rule.get('filepath')
      if not filepath:
        continue
      self._path_regexps[name] = re.compile(filepath)

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
    # Verify that all watchlist names are defined
    for name in watchlists:
      if name not in defns:
        logging.error("%s not defined in %s" % (name, self._GetRulesFilePath()))

  def GetWatchersForPaths(self, paths):
    """Fetch the list of watchers for |paths|

    Args:
      paths: [path1, path2, ...]

    Returns:
      [u1@chromium.org, u2@gmail.com, ...]
    """
    watchers = set()  # A set, to avoid duplicates
    for path in paths:
nirnimesh@chromium.org's avatar
nirnimesh@chromium.org committed
119
      path = path.replace(os.sep, '/')
120
      for name, rule in self._path_regexps.iteritems():
121 122
        if name not in self._watchlists:
          continue
123
        if rule.search(path):
124 125 126 127 128 129 130
          map(watchers.add, self._watchlists[name])
    return list(watchers)


def main(argv):
  # Confirm that watchlists can be parsed and spew out the watchers
  if len(argv) < 2:
131 132
    print("Usage (from the base of repo):")
    print("  %s [file-1] [file-2] ...." % argv[0])
133 134 135
    return 1
  wl = Watchlists(os.getcwd())
  watchers = wl.GetWatchersForPaths(argv[1:])
136
  print(watchers)
137 138 139 140


if __name__ == '__main__':
  main(sys.argv)