#!/usr/bin/env python # Copyright (c) 2011 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Watchlists Watchlists is a mechanism that allow a developer (a "watcher") to watch over portions of code that he is interested in. A "watcher" will be cc-ed to changes that modify that portion of code, thereby giving him an opportunity to make comments on codereview.chromium.org even before the change is committed. Refer: http://dev.chromium.org/developers/contributing-code/watchlists When invoked directly from the base of a repository, this script lists out the watchers for files given on the command line. This is useful to verify changes to WATCHLISTS files. """ from __future__ import print_function import logging import os import re import sys class Watchlists(object): """Manage Watchlists. This class provides mechanism to load watchlists for a repo and identify watchers. Usage: wl = Watchlists("/path/to/repo/root") watchers = wl.GetWatchersForPaths(["/path/to/file1", "/path/to/file2",]) """ _RULES = "WATCHLISTS" _RULES_FILENAME = _RULES _repo_root = None _defns = {} # Definitions _path_regexps = {} # Name -> Regular expression mapping _watchlists = {} # name to email mapping def __init__(self, repo_root): self._repo_root = repo_root self._LoadWatchlistRules() def _GetRulesFilePath(self): """Returns path to WATCHLISTS file.""" return os.path.join(self._repo_root, self._RULES_FILENAME) def _HasWatchlistsFile(self): """Determine if watchlists are available for this repo.""" return os.path.exists(self._GetRulesFilePath()) def _ContentsOfWatchlistsFile(self): """Read the WATCHLISTS file and return its contents.""" try: watchlists_file = open(self._GetRulesFilePath()) contents = watchlists_file.read() watchlists_file.close() return contents except IOError as e: logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e)) return '' def _LoadWatchlistRules(self): """Load watchlists from WATCHLISTS file. Does nothing if not present.""" if not self._HasWatchlistsFile(): return contents = self._ContentsOfWatchlistsFile() watchlists_data = None try: watchlists_data = eval(contents, {'__builtins__': None}, None) except SyntaxError as e: logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e)) return defns = watchlists_data.get("WATCHLIST_DEFINITIONS") if not defns: logging.error("WATCHLIST_DEFINITIONS not defined in %s" % self._GetRulesFilePath()) return watchlists = watchlists_data.get("WATCHLISTS") if not watchlists: logging.error("WATCHLISTS not defined in %s" % self._GetRulesFilePath()) return self._defns = defns self._watchlists = watchlists # Compile the regular expressions ahead of time to avoid creating them # on-the-fly multiple times per file. self._path_regexps = {} for name, rule in defns.iteritems(): filepath = rule.get('filepath') if not filepath: continue self._path_regexps[name] = re.compile(filepath) # Verify that all watchlist names are defined for name in watchlists: if name not in defns: logging.error("%s not defined in %s" % (name, self._GetRulesFilePath())) def GetWatchersForPaths(self, paths): """Fetch the list of watchers for |paths| Args: paths: [path1, path2, ...] Returns: [u1@chromium.org, u2@gmail.com, ...] """ watchers = set() # A set, to avoid duplicates for path in paths: path = path.replace(os.sep, '/') for name, rule in self._path_regexps.iteritems(): if name not in self._watchlists: continue if rule.search(path): map(watchers.add, self._watchlists[name]) return list(watchers) def main(argv): # Confirm that watchlists can be parsed and spew out the watchers if len(argv) < 2: print("Usage (from the base of repo):") print(" %s [file-1] [file-2] ...." % argv[0]) return 1 wl = Watchlists(os.getcwd()) watchers = wl.GetWatchersForPaths(argv[1:]) print(watchers) if __name__ == '__main__': main(sys.argv)