Commit 4da8a3d5 authored by Josip Sokcevic's avatar Josip Sokcevic Committed by LUCI CQ

Fix bot_update hanging on exception

This change starts observers as soon as possible, and wraps the entire
block that can cause an exception in try..finally. This ensures that we
stop threads are stopped.

If we notice the same behavior, it could be worth to put a cap on number
of executions per observer, and shut the thread down once reached.

R=apolito@google.com

Bug: 1255228
Change-Id: I1f165267460da02b3bbba39022c262e6c29fe86b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/3213451Reviewed-by: 's avatarAnthony Polito <apolito@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
parent 59e3296a
...@@ -134,6 +134,7 @@ class RepeatingTimer(threading.Thread): ...@@ -134,6 +134,7 @@ class RepeatingTimer(threading.Thread):
self.kwargs = kwargs if kwargs is not None else {} self.kwargs = kwargs if kwargs is not None else {}
self.cond = threading.Condition() self.cond = threading.Condition()
self.is_shutdown = False self.is_shutdown = False
self.is_reset = False
def reset(self): def reset(self):
"""Resets timer interval.""" """Resets timer interval."""
...@@ -191,6 +192,8 @@ def call(*args, **kwargs): # pragma: no cover ...@@ -191,6 +192,8 @@ def call(*args, **kwargs): # pragma: no cover
env = os.environ.copy() env = os.environ.copy()
env.update(new_env) env.update(new_env)
kwargs['env'] = env kwargs['env'] = env
stale_process_duration = env.get('STALE_PROCESS_DURATION',
STALE_PROCESS_DURATION)
if new_env: if new_env:
print('===Injecting Environment Variables===') print('===Injecting Environment Variables===')
...@@ -200,40 +203,46 @@ def call(*args, **kwargs): # pragma: no cover ...@@ -200,40 +203,46 @@ def call(*args, **kwargs): # pragma: no cover
print('In directory: %s' % cwd) print('In directory: %s' % cwd)
start_time = time.time() start_time = time.time()
proc = subprocess.Popen(args, **kwargs) proc = subprocess.Popen(args, **kwargs)
if stdin_data:
proc.stdin.write(stdin_data)
proc.stdin.close()
stale_process_duration = env.get('STALE_PROCESS_DURATION',
STALE_PROCESS_DURATION)
observers = [ observers = [
RepeatingTimer(300, _print_pstree), RepeatingTimer(300, _print_pstree),
RepeatingTimer(int(stale_process_duration), _terminate_process, [proc])] RepeatingTimer(int(stale_process_duration), _terminate_process, [proc])]
for observer in observers: for observer in observers:
observer.start() observer.start()
# This is here because passing 'sys.stdout' into stdout for proc will
# produce out of order output. try:
hanging_cr = False # If there's an exception in this block, we need to stop all observers.
while True: # Otherwise, observers will be spinning and main process won't exit while
for observer in observers: # the main thread will be doing nothing.
observer.reset() if stdin_data:
buf = proc.stdout.read(BUF_SIZE) proc.stdin.write(stdin_data)
if not buf: proc.stdin.close()
break # This is here because passing 'sys.stdout' into stdout for proc will
if hanging_cr: # produce out of order output.
buf = '\r' + buf hanging_cr = False
hanging_cr = buf.endswith('\r') while True:
for observer in observers:
observer.reset()
buf = proc.stdout.read(BUF_SIZE)
if not buf:
break
if hanging_cr:
buf = '\r' + buf
hanging_cr = buf.endswith(b'\r')
if hanging_cr:
buf = buf[:-1]
buf = buf.replace(b'\r\n', b'\n').replace(b'\r', b'\n')
_stdout_write(buf)
out.write(buf)
if hanging_cr: if hanging_cr:
buf = buf[:-1] _stdout_write(b'\n')
buf = buf.replace('\r\n', '\n').replace('\r', '\n') out.write(b'\n')
_stdout_write(buf)
out.write(buf) code = proc.wait()
if hanging_cr: finally:
_stdout_write('\n') for observer in observers:
out.write('\n') observer.shutdown()
for observer in observers:
observer.shutdown()
code = proc.wait()
elapsed_time = ((time.time() - start_time) / 60.0) elapsed_time = ((time.time() - start_time) / 60.0)
outval = out.getvalue().decode('utf-8') outval = out.getvalue().decode('utf-8')
if code: if code:
......
...@@ -251,7 +251,7 @@ class BotUpdateUnittests(unittest.TestCase): ...@@ -251,7 +251,7 @@ class BotUpdateUnittests(unittest.TestCase):
bot_update.ensure_checkout(**self.params) bot_update.ensure_checkout(**self.params)
args = self.gclient.records[0] args = self.gclient.records[0]
patch_refs = set( patch_refs = set(
args[i+1] for i in xrange(len(args)) args[i+1] for i in range(len(args))
if args[i] == '--patch-ref' and i+1 < len(args)) if args[i] == '--patch-ref' and i+1 < len(args))
self.assertIn(self.params['patch_refs'][0], patch_refs) self.assertIn(self.params['patch_refs'][0], patch_refs)
self.assertIn(self.params['patch_refs'][1], patch_refs) self.assertIn(self.params['patch_refs'][1], patch_refs)
...@@ -286,6 +286,10 @@ class BotUpdateUnittests(unittest.TestCase): ...@@ -286,6 +286,10 @@ class BotUpdateUnittests(unittest.TestCase):
actual_results = bot_update.parse_revisions(revisions, 'root') actual_results = bot_update.parse_revisions(revisions, 'root')
self.assertEqual(expected_results, actual_results) self.assertEqual(expected_results, actual_results)
class CallUnitTest(unittest.TestCase):
def testCall(self):
ret = bot_update.call(sys.executable, '-c', 'print(1)')
self.assertEqual(u'1\n', ret)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment