Commit 350a913f authored by Josip Sokcevic's avatar Josip Sokcevic Committed by LUCI CQ

Terminate stale bot_update process

Introduce a process observer that terminates child process if there is
not stdout activity. It can be overridden by an environment variable.

R=apolito@google.com, ehmaldonado@chromium.org

Bug: 1074355
Change-Id: I11de9d29e716587614cf336725c8d4a368a9d61d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/2167220Reviewed-by: 's avatarEdward Lesmes <ehmaldonado@chromium.org>
Reviewed-by: 's avatarAnthony Polito <apolito@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
parent b806e17e
...@@ -33,6 +33,11 @@ import os.path as path ...@@ -33,6 +33,11 @@ import os.path as path
# How many bytes at a time to read from pipes. # How many bytes at a time to read from pipes.
BUF_SIZE = 256 BUF_SIZE = 256
# How many seconds of no stdout activity before process is considered stale. Can
# be overridden via environmnet variable `STALE_PROCESS_DURATION`. If set to 0,
# process won't be terminated.
STALE_PROCESS_DURATION = 1200
# Define a bunch of directory paths. # Define a bunch of directory paths.
# Relative to this script's filesystem path. # Relative to this script's filesystem path.
THIS_DIR = path.dirname(path.abspath(__file__)) THIS_DIR = path.dirname(path.abspath(__file__))
...@@ -81,9 +86,6 @@ cache_dir = r%(cache_dir)s ...@@ -81,9 +86,6 @@ cache_dir = r%(cache_dir)s
""" """
# How many times to try before giving up.
ATTEMPTS = 5
GIT_CACHE_PATH = path.join(DEPOT_TOOLS_DIR, 'git_cache.py') GIT_CACHE_PATH = path.join(DEPOT_TOOLS_DIR, 'git_cache.py')
GCLIENT_PATH = path.join(DEPOT_TOOLS_DIR, 'gclient.py') GCLIENT_PATH = path.join(DEPOT_TOOLS_DIR, 'gclient.py')
...@@ -111,7 +113,19 @@ OK = object() ...@@ -111,7 +113,19 @@ OK = object()
FAIL = object() FAIL = object()
class PsPrinter(object): class ProcessObservers(object):
"""ProcessObservers allows monitoring of child process."""
def poke(self):
"""poke is called when child process sent `BUF_SIZE` data to stdout."""
pass
def cancel(self):
"""cancel is called once proc exists successfully."""
pass
class PsPrinter(ProcessObservers):
def __init__(self, interval=300): def __init__(self, interval=300):
self.interval = interval self.interval = interval
self.active = sys.platform.startswith('linux2') self.active = sys.platform.startswith('linux2')
...@@ -139,6 +153,30 @@ class PsPrinter(object): ...@@ -139,6 +153,30 @@ class PsPrinter(object):
self.thread = None self.thread = None
class StaleProcess(ProcessObservers):
'''StaleProcess terminates process if there is no poke call in `interval`. '''
def __init__(self, interval, proc):
self.interval = interval
self.proc = proc
self.thread = None
def _terminate_process(self):
print('Terminating stale process...')
self.proc.terminate()
def poke(self):
self.cancel()
if self.interval > 0:
self.thread = threading.Timer(self.interval, self._terminate_process)
self.thread.start()
def cancel(self):
if self.thread is not None:
self.thread.cancel()
self.thread = None
def call(*args, **kwargs): # pragma: no cover def call(*args, **kwargs): # pragma: no cover
"""Interactive subprocess call.""" """Interactive subprocess call."""
kwargs['stdout'] = subprocess.PIPE kwargs['stdout'] = subprocess.PIPE
...@@ -165,12 +203,15 @@ def call(*args, **kwargs): # pragma: no cover ...@@ -165,12 +203,15 @@ def call(*args, **kwargs): # pragma: no cover
if stdin_data: if stdin_data:
proc.stdin.write(stdin_data) proc.stdin.write(stdin_data)
proc.stdin.close() proc.stdin.close()
psprinter = PsPrinter() stale_process_duration = env.get('STALE_PROCESS_DURATION',
STALE_PROCESS_DURATION)
observers = [PsPrinter(), StaleProcess(int(stale_process_duration), proc)]
# This is here because passing 'sys.stdout' into stdout for proc will # This is here because passing 'sys.stdout' into stdout for proc will
# produce out of order output. # produce out of order output.
hanging_cr = False hanging_cr = False
while True: while True:
psprinter.poke() for observer in observers:
observer.poke()
buf = proc.stdout.read(BUF_SIZE) buf = proc.stdout.read(BUF_SIZE)
if not buf: if not buf:
break break
...@@ -185,7 +226,8 @@ def call(*args, **kwargs): # pragma: no cover ...@@ -185,7 +226,8 @@ def call(*args, **kwargs): # pragma: no cover
if hanging_cr: if hanging_cr:
sys.stdout.write('\n') sys.stdout.write('\n')
out.write('\n') out.write('\n')
psprinter.cancel() for observer in observers:
observer.cancel()
code = proc.wait() code = proc.wait()
elapsed_time = ((time.time() - start_time) / 60.0) elapsed_time = ((time.time() - start_time) / 60.0)
...@@ -674,6 +716,7 @@ def _git_checkout(sln, sln_dir, revisions, refs, no_fetch_tags, git_cache_dir, ...@@ -674,6 +716,7 @@ def _git_checkout(sln, sln_dir, revisions, refs, no_fetch_tags, git_cache_dir,
if url == CHROMIUM_SRC_URL or url + '.git' == CHROMIUM_SRC_URL: if url == CHROMIUM_SRC_URL or url + '.git' == CHROMIUM_SRC_URL:
# This is for performance investigation of `git fetch` in chromium/src. # This is for performance investigation of `git fetch` in chromium/src.
env = { env = {
'GIT_CURL_VERBOSE': '1',
'GIT_TRACE': 'true', 'GIT_TRACE': 'true',
'GIT_TRACE_PERFORMANCE': 'true', 'GIT_TRACE_PERFORMANCE': 'true',
} }
...@@ -766,16 +809,6 @@ def _git_disable_gc(cwd): ...@@ -766,16 +809,6 @@ def _git_disable_gc(cwd):
git('config', 'gc.autopacklimit', '0', cwd=cwd) git('config', 'gc.autopacklimit', '0', cwd=cwd)
def _download(url):
"""Fetch url and return content, with retries for flake."""
for attempt in xrange(ATTEMPTS):
try:
return urllib2.urlopen(url).read()
except Exception:
if attempt == ATTEMPTS - 1:
raise
def get_commit_position(git_path, revision='HEAD'): def get_commit_position(git_path, revision='HEAD'):
"""Dumps the 'git' log for a specific revision and parses out the commit """Dumps the 'git' log for a specific revision and parses out the commit
position. position.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment