Commit 94c712fa authored by maruel@chromium.org's avatar maruel@chromium.org

Reimplement r109239 but using Popen.communicate() instead.

Enables threaded callback handler for subprocess.communicate().

R=dpranke@chromium.org
BUG=
TEST=


Review URL: http://codereview.chromium.org/8749015

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@112465 0039d316-1c4b-4281-b951-d872f2087c98
parent 5d0fc9a7
...@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. ...@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
""" """
from __future__ import with_statement from __future__ import with_statement
import cStringIO
import errno import errno
import logging import logging
import os import os
import Queue
import subprocess import subprocess
import sys import sys
import tempfile
import time import time
import threading import threading
...@@ -170,6 +171,10 @@ class Popen(subprocess.Popen): ...@@ -170,6 +171,10 @@ class Popen(subprocess.Popen):
tmp_str += '; cwd=%s' % kwargs['cwd'] tmp_str += '; cwd=%s' % kwargs['cwd']
logging.debug(tmp_str) logging.debug(tmp_str)
self.stdout_cb = None
self.stderr_cb = None
self.stdout_void = False
self.stderr_void = False
def fix(stream): def fix(stream):
if kwargs.get(stream) in (VOID, os.devnull): if kwargs.get(stream) in (VOID, os.devnull):
# Replaces VOID with handle to /dev/null. # Replaces VOID with handle to /dev/null.
...@@ -178,11 +183,17 @@ class Popen(subprocess.Popen): ...@@ -178,11 +183,17 @@ class Popen(subprocess.Popen):
# When the pipe fills up, it will deadlock this process. Using a real # When the pipe fills up, it will deadlock this process. Using a real
# file works around that issue. # file works around that issue.
kwargs[stream] = open(os.devnull, 'w') kwargs[stream] = open(os.devnull, 'w')
setattr(self, stream + '_void', True)
if callable(kwargs.get(stream)):
# Callable stdout/stderr should be used only with call() wrappers.
setattr(self, stream + '_cb', kwargs[stream])
kwargs[stream] = PIPE
fix('stdout') fix('stdout')
fix('stderr') fix('stderr')
self.start = time.time() self.start = time.time()
self.timeout = None
self.shell = kwargs.get('shell', None) self.shell = kwargs.get('shell', None)
# Silence pylint on MacOSX # Silence pylint on MacOSX
self.returncode = None self.returncode = None
...@@ -205,6 +216,152 @@ class Popen(subprocess.Popen): ...@@ -205,6 +216,152 @@ class Popen(subprocess.Popen):
# through # through
raise raise
def _tee_threads(self, input): # pylint: disable=W0622
"""Does I/O for a process's pipes using threads.
It's the simplest and slowest implementation. Expect very slow behavior.
If there is a callback and it doesn't keep up with the calls, the timeout
effectiveness will be delayed accordingly.
"""
# Queue of either of <threadname> when done or (<threadname>, data). In
# theory we would like to limit to ~64kb items to not cause large memory
# usage when the callback blocks. It is not done because it slows down
# processing on OSX10.6 by a factor of 2x, making it even slower than
# Windows! Revisit this decision if it becomes a problem, e.g. crash
# because of memory exhaustion.
queue = Queue.Queue()
done = threading.Event()
def write_stdin():
try:
stdin_io = cStringIO.StringIO(input)
while True:
data = stdin_io.read(1024)
if data:
self.stdin.write(data)
else:
self.stdin.close()
break
finally:
queue.put('stdin')
def _queue_pipe_read(pipe, name):
"""Queues characters read from a pipe into a queue."""
try:
while True:
data = pipe.read(1)
if not data:
break
queue.put((name, data))
finally:
queue.put(name)
def timeout_fn():
try:
done.wait(self.timeout)
finally:
queue.put('timeout')
def wait_fn():
try:
self.wait()
finally:
queue.put('wait')
# Starts up to 5 threads:
# Wait for the process to quit
# Read stdout
# Read stderr
# Write stdin
# Timeout
threads = {
'wait': threading.Thread(target=wait_fn),
}
if self.timeout is not None:
threads['timeout'] = threading.Thread(target=timeout_fn)
if self.stdout_cb:
threads['stdout'] = threading.Thread(
target=_queue_pipe_read, args=(self.stdout, 'stdout'))
if self.stderr_cb:
threads['stderr'] = threading.Thread(
target=_queue_pipe_read, args=(self.stderr, 'stderr'))
if input:
threads['stdin'] = threading.Thread(target=write_stdin)
for t in threads.itervalues():
t.start()
timed_out = False
try:
# This thread needs to be optimized for speed.
while threads:
item = queue.get()
if item[0] is 'stdout':
self.stdout_cb(item[1])
elif item[0] is 'stderr':
self.stderr_cb(item[1])
else:
# A thread terminated.
threads[item].join()
del threads[item]
if item == 'wait':
# Terminate the timeout thread if necessary.
done.set()
elif item == 'timeout' and not timed_out and self.poll() is None:
logging.debug('Timed out after %fs: killing' % self.timeout)
self.kill()
timed_out = True
finally:
# Stop the threads.
done.set()
if 'wait' in threads:
# Accelerate things, otherwise it would hang until the child process is
# done.
logging.debug('Killing child because of an exception')
self.kill()
# Join threads.
for thread in threads.itervalues():
thread.join()
if timed_out:
self.returncode = TIMED_OUT
def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622
"""Adds timeout and callbacks support.
Returns (stdout, stderr) like subprocess.Popen().communicate().
- The process will be killed after |timeout| seconds and returncode set to
TIMED_OUT.
"""
self.timeout = timeout
if not self.timeout and not self.stdout_cb and not self.stderr_cb:
return super(Popen, self).communicate(input)
if self.timeout and self.shell:
raise TypeError(
'Using timeout and shell simultaneously will cause a process leak '
'since the shell will be killed instead of the child process.')
stdout = None
stderr = None
# Convert to a lambda to workaround python's deadlock.
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
# When the pipe fills up, it will deadlock this process. Using a thread
# works around that issue. No need for thread safe function since the call
# backs are guaranteed to be called from the main thread.
if self.stdout and not self.stdout_cb and not self.stdout_void:
stdout = cStringIO.StringIO()
self.stdout_cb = stdout.write
if self.stderr and not self.stderr_cb and not self.stderr_void:
stderr = cStringIO.StringIO()
self.stderr_cb = stderr.write
self._tee_threads(input)
if stdout:
stdout = stdout.getvalue()
if stderr:
stderr = stderr.getvalue()
return (stdout, stderr)
def communicate(args, timeout=None, **kwargs): def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate() and add timeout support. """Wraps subprocess.Popen().communicate() and add timeout support.
...@@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs): ...@@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs):
# set the Popen() parameter accordingly. # set the Popen() parameter accordingly.
kwargs['stdin'] = PIPE kwargs['stdin'] = PIPE
if not timeout: proc = Popen(args, **kwargs)
# Normal workflow. if stdin not in (None, VOID):
proc = Popen(args, **kwargs) return proc.communicate(stdin, timeout), proc.returncode
if stdin is not None: else:
return proc.communicate(stdin), proc.returncode return proc.communicate(None, timeout), proc.returncode
else:
return proc.communicate(), proc.returncode
# Create a temporary file to workaround python's deadlock.
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
# When the pipe fills up, it will deadlock this process. Using a real file
# works around that issue.
with tempfile.TemporaryFile() as buff:
kwargs['stdout'] = buff
proc = Popen(args, **kwargs)
if proc.shell:
raise TypeError(
'Using timeout and shell simultaneously will cause a process leak '
'since the shell will be killed instead of the child process.')
if stdin is not None:
proc.stdin.write(stdin)
while proc.returncode is None:
proc.poll()
if timeout and (time.time() - proc.start) > timeout:
proc.kill()
proc.wait()
# It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
proc.returncode = TIMED_OUT
time.sleep(0.001)
# Now that the process died, reset the cursor and read the file.
buff.seek(0)
out = (buff.read(), None)
return out, proc.returncode
def call(args, **kwargs): def call(args, **kwargs):
......
...@@ -77,7 +77,7 @@ class DefaultsTest(auto_stub.TestCase): ...@@ -77,7 +77,7 @@ class DefaultsTest(auto_stub.TestCase):
results.update(kwargs) results.update(kwargs)
results['args'] = args results['args'] = args
@staticmethod @staticmethod
def communicate(): def communicate(input=None, timeout=None): # pylint: disable=W0622
return None, None return None, None
self.mock(subprocess2, 'Popen', fake_Popen) self.mock(subprocess2, 'Popen', fake_Popen)
return results return results
...@@ -180,6 +180,12 @@ class BaseTestCase(unittest.TestCase): ...@@ -180,6 +180,12 @@ class BaseTestCase(unittest.TestCase):
self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
super(BaseTestCase, self).tearDown() super(BaseTestCase, self).tearDown()
def _check_res(self, res, stdout, stderr, returncode):
(out, err), code = res
self.assertEquals(stdout, out)
self.assertEquals(stderr, err)
self.assertEquals(returncode, code)
class RegressionTest(BaseTestCase): class RegressionTest(BaseTestCase):
# Regression tests to ensure that subprocess and subprocess2 have the same # Regression tests to ensure that subprocess and subprocess2 have the same
...@@ -299,6 +305,27 @@ class RegressionTest(BaseTestCase): ...@@ -299,6 +305,27 @@ class RegressionTest(BaseTestCase):
except subp.CalledProcessError, e: except subp.CalledProcessError, e:
self._check_exception(subp, e, None, None, 64) self._check_exception(subp, e, None, None, 64)
def test_redirect_stderr_to_stdout_pipe(self):
def fn(c, e, un, subp):
# stderr output into stdout.
proc = subp.Popen(
e + ['--stderr'],
stdout=subp.PIPE,
stderr=subp.STDOUT,
universal_newlines=un)
res = proc.communicate(), proc.returncode
self._check_res(res, c('a\nbb\nccc\n'), None, 0)
self._run_test(fn)
def test_redirect_stderr_to_stdout(self):
def fn(c, e, un, subp):
# stderr output into stdout but stdout is not piped.
proc = subp.Popen(
e + ['--stderr'], stderr=STDOUT, universal_newlines=un)
res = proc.communicate(), proc.returncode
self._check_res(res, None, None, 0)
self._run_test(fn)
class S2Test(BaseTestCase): class S2Test(BaseTestCase):
# Tests that can only run in subprocess2, e.g. new functionalities. # Tests that can only run in subprocess2, e.g. new functionalities.
...@@ -326,11 +353,11 @@ class S2Test(BaseTestCase): ...@@ -326,11 +353,11 @@ class S2Test(BaseTestCase):
function(noop, self.exe + ['--cr'], True) function(noop, self.exe + ['--cr'], True)
function(noop, self.exe + ['--crlf'], True) function(noop, self.exe + ['--crlf'], True)
def _check_res(self, res, stdout, stderr, returncode): def _check_exception(self, e, stdout, stderr, returncode):
(out, err), code = res """On exception, look if the exception members are set correctly."""
self.assertEquals(stdout, out) self.assertEquals(returncode, e.returncode)
self.assertEquals(stderr, err) self.assertEquals(stdout, e.stdout)
self.assertEquals(returncode, code) self.assertEquals(stderr, e.stderr)
def test_timeout(self): def test_timeout(self):
# timeout doesn't exist in subprocess. # timeout doesn't exist in subprocess.
...@@ -383,25 +410,128 @@ class S2Test(BaseTestCase): ...@@ -383,25 +410,128 @@ class S2Test(BaseTestCase):
self._check_res(res, None, None, 0) self._check_res(res, None, None, 0)
self._run_test(fn) self._run_test(fn)
def test_check_output_redirect_stderr_to_stdout_pipe(self): def test_tee_stderr(self):
def fn(c, e, un): def fn(c, e, un):
# stderr output into stdout. stderr = []
res = subprocess2.communicate( res = subprocess2.communicate(
e + ['--stderr'], e + ['--stderr'], stderr=stderr.append, universal_newlines=un)
stdout=PIPE, self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
stderr=STDOUT, self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stdout_stderr(self):
def fn(c, e, un):
stdout = []
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=stdout.append,
stderr=stderr.append,
universal_newlines=un) universal_newlines=un)
self._check_res(res, c('a\nbb\nccc\n'), None, 0) self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._check_res(res, None, None, 0)
self._run_test(fn) self._run_test(fn)
def test_check_output_redirect_stderr_to_stdout(self): def test_tee_stdin(self):
def fn(c, e, un): def fn(c, e, un):
# stderr output into stdout but stdout is not piped. stdout = []
stdin = '0123456789'
res = subprocess2.communicate( res = subprocess2.communicate(
e + ['--stderr'], stderr=STDOUT, universal_newlines=un) e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
universal_newlines=un)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._check_res(res, None, None, 0) self._check_res(res, None, None, 0)
self._run_test(fn) self._run_test(fn)
def test_tee_throw(self):
def fn(c, e, un):
stderr = []
try:
subprocess2.check_output(
e + ['--stderr', '--fail'], stderr=stderr.append,
universal_newlines=un)
self.fail()
except subprocess2.CalledProcessError, e:
self._check_exception(e, '', None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stdout_void(self):
def fn(c, e, un):
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=VOID,
stderr=stderr.append,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stderr_void(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=VOID,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._run_test(fn)
def test_tee_timeout_stderr_stdout(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=STDOUT,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
# Ordering is random due to buffering.
self.assertEquals(
set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)),
set(''.join(stdout).splitlines(True)))
self._run_test(fn)
def test_tee_large(self):
stdout = []
# Read 128kb. On my workstation it takes >2s. Welcome to 2011.
res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
self._check_res(res, None, None, 0)
def test_tee_large_stdin(self):
stdout = []
# Write 128kb.
stdin = '0123456789abcdef' * (8*1024)
res = subprocess2.communicate(
self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
self._check_res(res, None, None, 0)
def test_tee_cb_throw(self):
# Having a callback throwing up should not cause side-effects. It's a bit
# hard to measure.
class Blow(Exception):
pass
def blow(_):
raise Blow()
proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow)
try:
proc.communicate()
self.fail()
except Blow:
self.assertNotEquals(0, proc.returncode)
def child_main(args): def child_main(args):
if sys.platform == 'win32': if sys.platform == 'win32':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment