Commit 12b07e7e authored by szager@chromium.org's avatar szager@chromium.org

Refactor nag functionality in to NagTimer class.

Add default 30 second nag timer to gclient subprocesses.

BUG=227537

Review URL: https://chromiumcodereview.appspot.com/14826003

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@198207 0039d316-1c4b-4281-b951-d872f2087c98
parent 2fd6c3fc
...@@ -116,6 +116,9 @@ class SCMWrapper(object): ...@@ -116,6 +116,9 @@ class SCMWrapper(object):
This is the abstraction layer to bind to different SCM. This is the abstraction layer to bind to different SCM.
""" """
nag_timer = 30
nag_max = 3
def __init__(self, url=None, root_dir=None, relpath=None): def __init__(self, url=None, root_dir=None, relpath=None):
self.url = url self.url = url
self._root_dir = root_dir self._root_dir = root_dir
...@@ -195,6 +198,8 @@ class GitWrapper(SCMWrapper): ...@@ -195,6 +198,8 @@ class GitWrapper(SCMWrapper):
gclient_utils.CheckCallAndFilter( gclient_utils.CheckCallAndFilter(
['git', 'diff', merge_base], ['git', 'diff', merge_base],
cwd=self.checkout_path, cwd=self.checkout_path,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
filter_fn=GitDiffFilterer(self.relpath).Filter) filter_fn=GitDiffFilterer(self.relpath).Filter)
def UpdateSubmoduleConfig(self): def UpdateSubmoduleConfig(self):
...@@ -208,6 +213,8 @@ class GitWrapper(SCMWrapper): ...@@ -208,6 +213,8 @@ class GitWrapper(SCMWrapper):
cmd4 = ['git', 'config', 'fetch.recurseSubmodules', 'false'] cmd4 = ['git', 'config', 'fetch.recurseSubmodules', 'false']
kwargs = {'cwd': self.checkout_path, kwargs = {'cwd': self.checkout_path,
'print_stdout': False, 'print_stdout': False,
'nag_timer': self.nag_timer,
'nag_max': self.nag_max,
'filter_fn': lambda x: None} 'filter_fn': lambda x: None}
try: try:
gclient_utils.CheckCallAndFilter(cmd, **kwargs) gclient_utils.CheckCallAndFilter(cmd, **kwargs)
...@@ -852,6 +859,8 @@ class GitWrapper(SCMWrapper): ...@@ -852,6 +859,8 @@ class GitWrapper(SCMWrapper):
return subprocess2.check_output( return subprocess2.check_output(
['git'] + args, ['git'] + args,
stderr=subprocess2.PIPE, stderr=subprocess2.PIPE,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
cwd=self.checkout_path).strip() cwd=self.checkout_path).strip()
def _UpdateBranchHeads(self, options, fetch=False): def _UpdateBranchHeads(self, options, fetch=False):
...@@ -879,6 +888,8 @@ class GitWrapper(SCMWrapper): ...@@ -879,6 +888,8 @@ class GitWrapper(SCMWrapper):
def _Run(self, args, options, **kwargs): def _Run(self, args, options, **kwargs):
kwargs.setdefault('cwd', self.checkout_path) kwargs.setdefault('cwd', self.checkout_path)
kwargs.setdefault('print_stdout', True) kwargs.setdefault('print_stdout', True)
kwargs.setdefault('nag_timer', self.nag_timer)
kwargs.setdefault('nag_max', self.nag_max)
stdout = kwargs.get('stdout', sys.stdout) stdout = kwargs.get('stdout', sys.stdout)
stdout.write('\n________ running \'git %s\' in \'%s\'\n' % ( stdout.write('\n________ running \'git %s\' in \'%s\'\n' % (
' '.join(args), kwargs['cwd'])) ' '.join(args), kwargs['cwd']))
...@@ -928,6 +939,8 @@ class SVNWrapper(SCMWrapper): ...@@ -928,6 +939,8 @@ class SVNWrapper(SCMWrapper):
['svn', 'diff', '-x', '--ignore-eol-style'] + args, ['svn', 'diff', '-x', '--ignore-eol-style'] + args,
cwd=self.checkout_path, cwd=self.checkout_path,
print_stdout=False, print_stdout=False,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
filter_fn=SvnDiffFilterer(self.relpath).Filter) filter_fn=SvnDiffFilterer(self.relpath).Filter)
def update(self, options, args, file_list): def update(self, options, args, file_list):
...@@ -1225,6 +1238,8 @@ class SVNWrapper(SCMWrapper): ...@@ -1225,6 +1238,8 @@ class SVNWrapper(SCMWrapper):
def _Run(self, args, options, **kwargs): def _Run(self, args, options, **kwargs):
"""Runs a commands that goes to stdout.""" """Runs a commands that goes to stdout."""
kwargs.setdefault('cwd', self.checkout_path) kwargs.setdefault('cwd', self.checkout_path)
kwargs.setdefault('nag_timer', self.nag_timer)
kwargs.setdefault('nag_max', self.nag_max)
gclient_utils.CheckCallAndFilterAndHeader(['svn'] + args, gclient_utils.CheckCallAndFilterAndHeader(['svn'] + args,
always=options.verbose, **kwargs) always=options.verbose, **kwargs)
......
...@@ -375,7 +375,7 @@ class GClientChildren(object): ...@@ -375,7 +375,7 @@ class GClientChildren(object):
def CheckCallAndFilter(args, stdout=None, filter_fn=None, def CheckCallAndFilter(args, stdout=None, filter_fn=None,
print_stdout=None, call_filter_on_first_line=False, print_stdout=None, call_filter_on_first_line=False,
**kwargs): nag_timer=None, nag_max=None, **kwargs):
"""Runs a command and calls back a filter function if needed. """Runs a command and calls back a filter function if needed.
Accepts all subprocess2.Popen() parameters plus: Accepts all subprocess2.Popen() parameters plus:
...@@ -399,6 +399,21 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, ...@@ -399,6 +399,21 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
# Do a flush of stdout before we begin reading from the subprocess2's stdout # Do a flush of stdout before we begin reading from the subprocess2's stdout
stdout.flush() stdout.flush()
nag = None
if nag_timer:
# Hack thread.index to force correct annotation.
index = getattr(threading.currentThread(), 'index', 0)
def _nag_cb(elapsed):
setattr(threading.currentThread(), 'index', index)
stdout.write(' No output for %.0f seconds from command:\n' % elapsed)
stdout.write(' %s\n' % kid.cmd_str)
if (nag_max and
int('%.0f' % (elapsed / nag_timer)) >= nag_max):
stdout.write(' ... killing it!\n')
kid.kill()
nag = subprocess2.NagTimer(nag_timer, _nag_cb)
nag.start()
# Also, we need to forward stdout to prevent weird re-ordering of output. # Also, we need to forward stdout to prevent weird re-ordering of output.
# This has to be done on a per byte basis to make sure it is not buffered: # This has to be done on a per byte basis to make sure it is not buffered:
# normally buffering is done for each line, but if svn requests input, no # normally buffering is done for each line, but if svn requests input, no
...@@ -406,6 +421,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, ...@@ -406,6 +421,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
try: try:
in_byte = kid.stdout.read(1) in_byte = kid.stdout.read(1)
if in_byte: if in_byte:
if nag:
nag.event()
if call_filter_on_first_line: if call_filter_on_first_line:
filter_fn(None) filter_fn(None)
in_line = '' in_line = ''
...@@ -422,6 +439,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, ...@@ -422,6 +439,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
filter_fn(in_line) filter_fn(in_line)
in_line = '' in_line = ''
in_byte = kid.stdout.read(1) in_byte = kid.stdout.read(1)
if in_byte and nag:
nag.event()
# Flush the rest of buffered output. This is only an issue with # Flush the rest of buffered output. This is only an issue with
# stdout/stderr not ending with a \n. # stdout/stderr not ending with a \n.
if len(in_line): if len(in_line):
...@@ -435,6 +454,9 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, ...@@ -435,6 +454,9 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
except KeyboardInterrupt: except KeyboardInterrupt:
print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args)
raise raise
finally:
if nag:
nag.cancel()
if rv: if rv:
raise subprocess2.CalledProcessError( raise subprocess2.CalledProcessError(
......
...@@ -132,6 +132,42 @@ def get_english_env(env): ...@@ -132,6 +132,42 @@ def get_english_env(env):
return env return env
class NagTimer(object):
"""
Triggers a callback when a time interval passes without an event being fired.
For example, the event could be receiving terminal output from a subprocess;
and the callback could print a warning to stderr that the subprocess appeared
to be hung.
"""
def __init__(self, interval, cb):
self.interval = interval
self.cb = cb
self.timer = threading.Timer(self.interval, self.fn)
self.last_output = self.previous_last_output = 0
def start(self):
self.last_output = self.previous_last_output = time.time()
self.timer.start()
def event(self):
self.last_output = time.time()
def fn(self):
now = time.time()
if self.last_output == self.previous_last_output:
self.cb(now - self.previous_last_output)
# Use 0.1 fudge factor, just in case
# (self.last_output - now) is very close to zero.
sleep_time = (self.last_output - now - 0.1) % self.interval
self.previous_last_output = self.last_output
self.timer = threading.Timer(sleep_time + 0.1, self.fn)
self.timer.start()
def cancel(self):
self.timer.cancel()
class Popen(subprocess.Popen): class Popen(subprocess.Popen):
"""Wraps subprocess.Popen() with various workarounds. """Wraps subprocess.Popen() with various workarounds.
...@@ -192,6 +228,7 @@ class Popen(subprocess.Popen): ...@@ -192,6 +228,7 @@ class Popen(subprocess.Popen):
self.start = time.time() self.start = time.time()
self.timeout = None self.timeout = None
self.nag_timer = None self.nag_timer = None
self.nag_max = None
self.shell = kwargs.get('shell', None) self.shell = kwargs.get('shell', None)
# Silence pylint on MacOSX # Silence pylint on MacOSX
self.returncode = None self.returncode = None
...@@ -230,8 +267,7 @@ class Popen(subprocess.Popen): ...@@ -230,8 +267,7 @@ class Popen(subprocess.Popen):
# because of memory exhaustion. # because of memory exhaustion.
queue = Queue.Queue() queue = Queue.Queue()
done = threading.Event() done = threading.Event()
timer = [] nag = None
last_output = [time.time()] * 2
def write_stdin(): def write_stdin():
try: try:
...@@ -253,28 +289,12 @@ class Popen(subprocess.Popen): ...@@ -253,28 +289,12 @@ class Popen(subprocess.Popen):
data = pipe.read(1) data = pipe.read(1)
if not data: if not data:
break break
last_output[0] = time.time() if nag:
nag.event()
queue.put((name, data)) queue.put((name, data))
finally: finally:
queue.put(name) queue.put(name)
def nag_fn():
now = time.time()
if done.is_set():
return
if last_output[0] == last_output[1]:
logging.warn(' No output for %.0f seconds from command:' % (
now - last_output[1]))
logging.warn(' %s' % self.cmd_str)
# Use 0.1 fudge factor in case:
# now ~= last_output[0] + self.nag_timer
sleep_time = self.nag_timer + last_output[0] - now - 0.1
while sleep_time < 0:
sleep_time += self.nag_timer
last_output[1] = last_output[0]
timer[0] = threading.Timer(sleep_time, nag_fn)
timer[0].start()
def timeout_fn(): def timeout_fn():
try: try:
done.wait(self.timeout) done.wait(self.timeout)
...@@ -313,8 +333,15 @@ class Popen(subprocess.Popen): ...@@ -313,8 +333,15 @@ class Popen(subprocess.Popen):
t.start() t.start()
if self.nag_timer: if self.nag_timer:
timer.append(threading.Timer(self.nag_timer, nag_fn)) def _nag_cb(elapsed):
timer[0].start() logging.warn(' No output for %.0f seconds from command:' % elapsed)
logging.warn(' %s' % self.cmd_str)
if (self.nag_max and
int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
queue.put('timeout')
done.set() # Must do this so that timeout thread stops waiting.
nag = NagTimer(self.nag_timer, _nag_cb)
nag.start()
timed_out = False timed_out = False
try: try:
...@@ -327,20 +354,22 @@ class Popen(subprocess.Popen): ...@@ -327,20 +354,22 @@ class Popen(subprocess.Popen):
self.stderr_cb(item[1]) self.stderr_cb(item[1])
else: else:
# A thread terminated. # A thread terminated.
threads[item].join() if item in threads:
del threads[item] threads[item].join()
del threads[item]
if item == 'wait': if item == 'wait':
# Terminate the timeout thread if necessary. # Terminate the timeout thread if necessary.
done.set() done.set()
elif item == 'timeout' and not timed_out and self.poll() is None: elif item == 'timeout' and not timed_out and self.poll() is None:
logging.debug('Timed out after %fs: killing' % self.timeout) logging.debug('Timed out after %.0fs: killing' % (
time.time() - self.start))
self.kill() self.kill()
timed_out = True timed_out = True
finally: finally:
# Stop the threads. # Stop the threads.
done.set() done.set()
if timer: if nag:
timer[0].cancel() nag.cancel()
if 'wait' in threads: if 'wait' in threads:
# Accelerate things, otherwise it would hang until the child process is # Accelerate things, otherwise it would hang until the child process is
# done. # done.
...@@ -353,7 +382,8 @@ class Popen(subprocess.Popen): ...@@ -353,7 +382,8 @@ class Popen(subprocess.Popen):
self.returncode = TIMED_OUT self.returncode = TIMED_OUT
# pylint: disable=W0221,W0622 # pylint: disable=W0221,W0622
def communicate(self, input=None, timeout=None, nag_timer=None): def communicate(self, input=None, timeout=None, nag_timer=None,
nag_max=None):
"""Adds timeout and callbacks support. """Adds timeout and callbacks support.
Returns (stdout, stderr) like subprocess.Popen().communicate(). Returns (stdout, stderr) like subprocess.Popen().communicate().
...@@ -365,6 +395,7 @@ class Popen(subprocess.Popen): ...@@ -365,6 +395,7 @@ class Popen(subprocess.Popen):
""" """
self.timeout = timeout self.timeout = timeout
self.nag_timer = nag_timer self.nag_timer = nag_timer
self.nag_max = nag_max
if (not self.timeout and not self.nag_timer and if (not self.timeout and not self.nag_timer and
not self.stdout_cb and not self.stderr_cb): not self.stdout_cb and not self.stderr_cb):
return super(Popen, self).communicate(input) return super(Popen, self).communicate(input)
...@@ -393,7 +424,7 @@ class Popen(subprocess.Popen): ...@@ -393,7 +424,7 @@ class Popen(subprocess.Popen):
return (stdout, stderr) return (stdout, stderr)
def communicate(args, timeout=None, nag_timer=None, **kwargs): def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
"""Wraps subprocess.Popen().communicate() and add timeout support. """Wraps subprocess.Popen().communicate() and add timeout support.
Returns ((stdout, stderr), returncode). Returns ((stdout, stderr), returncode).
......
...@@ -107,6 +107,8 @@ class SVNWrapperTestCase(BaseTestCase): ...@@ -107,6 +107,8 @@ class SVNWrapperTestCase(BaseTestCase):
'RunCommand', 'RunCommand',
'cleanup', 'cleanup',
'diff', 'diff',
'nag_max',
'nag_timer',
'pack', 'pack',
'relpath', 'relpath',
'revert', 'revert',
...@@ -496,6 +498,8 @@ class SVNWrapperTestCase(BaseTestCase): ...@@ -496,6 +498,8 @@ class SVNWrapperTestCase(BaseTestCase):
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'checkout', '--depth', 'empty', self.url, self.base_path], ['svn', 'checkout', '--depth', 'empty', self.url, self.base_path],
always=True, always=True,
nag_max=3,
nag_timer=30,
cwd=self.root_dir) cwd=self.root_dir)
gclient_scm.scm.SVN.RunAndGetFileList( gclient_scm.scm.SVN.RunAndGetFileList(
options.verbose, options.verbose,
...@@ -530,7 +534,7 @@ class SVNWrapperTestCase(BaseTestCase): ...@@ -530,7 +534,7 @@ class SVNWrapperTestCase(BaseTestCase):
files_list = self.mox.CreateMockAnything() files_list = self.mox.CreateMockAnything()
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'export', join(self.url, 'DEPS'), join(self.base_path, 'DEPS')], ['svn', 'export', join(self.url, 'DEPS'), join(self.base_path, 'DEPS')],
always=True, cwd=self.root_dir) nag_timer=30, nag_max=3, always=True, cwd=self.root_dir)
self.mox.ReplayAll() self.mox.ReplayAll()
scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir, scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir,
...@@ -563,6 +567,8 @@ class SVNWrapperTestCase(BaseTestCase): ...@@ -563,6 +567,8 @@ class SVNWrapperTestCase(BaseTestCase):
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader( gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'checkout', '--depth', 'empty', self.url, self.base_path], ['svn', 'checkout', '--depth', 'empty', self.url, self.base_path],
always=True, always=True,
nag_max=3,
nag_timer=30,
cwd=self.root_dir) cwd=self.root_dir)
gclient_scm.scm.SVN.RunAndGetFileList( gclient_scm.scm.SVN.RunAndGetFileList(
options.verbose, options.verbose,
...@@ -787,6 +793,8 @@ class ManagedGitWrapperTestCase(BaseGitWrapperTestCase): ...@@ -787,6 +793,8 @@ class ManagedGitWrapperTestCase(BaseGitWrapperTestCase):
'RunCommand', 'RunCommand',
'cleanup', 'cleanup',
'diff', 'diff',
'nag_max',
'nag_timer',
'pack', 'pack',
'UpdateSubmoduleConfig', 'UpdateSubmoduleConfig',
'relpath', 'relpath',
......
...@@ -78,7 +78,7 @@ class DefaultsTest(auto_stub.TestCase): ...@@ -78,7 +78,7 @@ class DefaultsTest(auto_stub.TestCase):
results['args'] = args results['args'] = args
@staticmethod @staticmethod
# pylint: disable=W0622 # pylint: disable=W0622
def communicate(input=None, timeout=None, nag_timer=None): def communicate(input=None, timeout=None, nag_max=None, nag_timer=None):
return None, None return None, None
self.mock(subprocess2, 'Popen', fake_Popen) self.mock(subprocess2, 'Popen', fake_Popen)
return results return results
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment