Commit cc2fe9b7 authored by Scott Lee's avatar Scott Lee Committed by LUCI CQ

[resultdb] use requests.Session in rdb_wrapper

requests.post() creates a new HTTP connection for each call.
It makes presubmit checks slower and timed out.
This CL updates rdb_wrapper to use requests.Session() to use
a connection pool for Sink requests.

Change-Id: Id02ecce85898ae15993d53df17473bb5dfb7e89f
Bug: 1145762
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/2532895
Commit-Queue: Scott Lee <ddoman@chromium.org>
Reviewed-by: 's avatarChan Li <chanli@chromium.org>
Reviewed-by: 's avatarErik Staab <estaab@chromium.org>
Reviewed-by: 's avatarDirk Pranke <dpranke@google.com>
parent 55896522
......@@ -1581,30 +1581,34 @@ class PresubmitExecuter(object):
results = []
try:
if 'PRESUBMIT_VERSION' in context and \
[int(x) for x in context['PRESUBMIT_VERSION'].split('.')] >= [2, 0, 0]:
for function_name in context:
if not function_name.startswith('Check'):
continue
if function_name.endswith('Commit') and not self.committing:
continue
if function_name.endswith('Upload') and self.committing:
continue
logging.debug('Running %s in %s', function_name, presubmit_path)
results.extend(
self._run_check_function(function_name, context, prefix))
logging.debug('Running %s done.', function_name)
self.more_cc.extend(output_api.more_cc)
else: # Old format
if self.committing:
function_name = 'CheckChangeOnCommit'
else:
function_name = 'CheckChangeOnUpload'
if function_name in context:
version = [
int(x) for x in context.get('PRESUBMIT_VERSION', '0.0.0').split('.')
]
with rdb_wrapper.client(prefix) as sink:
if version >= [2, 0, 0]:
for function_name in context:
if not function_name.startswith('Check'):
continue
if function_name.endswith('Commit') and not self.committing:
continue
if function_name.endswith('Upload') and self.committing:
continue
logging.debug('Running %s in %s', function_name, presubmit_path)
results.extend(
self._run_check_function(function_name, context, sink))
logging.debug('Running %s done.', function_name)
self.more_cc.extend(output_api.more_cc)
else: # Old format
if self.committing:
function_name = 'CheckChangeOnCommit'
else:
function_name = 'CheckChangeOnUpload'
if function_name in context:
logging.debug('Running %s in %s', function_name, presubmit_path)
results.extend(
self._run_check_function(function_name, context, prefix))
self._run_check_function(function_name, context, sink))
logging.debug('Running %s done.', function_name)
self.more_cc.extend(output_api.more_cc)
......@@ -1616,23 +1620,37 @@ class PresubmitExecuter(object):
os.chdir(main_path)
return results
def _run_check_function(self, function_name, context, prefix):
"""Evaluates a presubmit check function, function_name, in the context
provided. If LUCI_CONTEXT is enabled, it will send the result to ResultSink.
Passes function_name and prefix to rdb_wrapper.setup_rdb. Returns results.
def _run_check_function(self, function_name, context, sink=None):
"""Evaluates and returns the result of a given presubmit function.
If sink is given, the result of the presubmit function will be reported
to the ResultSink.
Args:
function_name: a string representing the name of the function to run
function_name: the name of the presubmit function to evaluate
context: a context dictionary in which the function will be evaluated
prefix: a string describing prefix for ResultDB test id
Returns: Results from evaluating the function call."""
with rdb_wrapper.setup_rdb(function_name, prefix) as my_status:
sink: an instance of ResultSink. None, by default.
Returns:
the result of the presubmit function call.
"""
start_time = time_time()
try:
result = eval(function_name + '(*__args)', context)
self._check_result_type(result)
if any(res.fatal for res in result):
my_status.status = rdb_wrapper.STATUS_FAIL
return result
except:
if sink:
elapsed_time = time_time() - start_time
sink.report(function_name, rdb_wrapper.STATUS_FAIL, elapsed_time)
raise
if sink:
elapsed_time = time_time() - start_time
status = rdb_wrapper.STATUS_PASS
if any(r.fatal for r in result):
status = rdb_wrapper.STATUS_FAIL
sink.report(function_name, status, elapsed_time)
return result
def _check_result_type(self, result):
"""Helper function which ensures result is a list, and all elements are
......
......@@ -16,52 +16,65 @@ STATUS_CRASH = 'CRASH'
STATUS_ABORT = 'ABORT'
STATUS_SKIP = 'SKIP'
class ResultSinkStatus(object):
def __init__(self):
self.status = STATUS_PASS
class ResultSink(object):
def __init__(self, session, url, prefix):
self._session = session
self._url = url
self._prefix = prefix
def report(self, function_name, status, elapsed_time):
"""Reports the result and elapsed time of a presubmit function call.
Args:
function_name (str): The name of the presubmit function
status: the status to report the function call with
elapsed_time: the time taken to invoke the presubmit function
"""
tr = {
'testId': self._prefix + function_name,
'status': status,
'expected': status == STATUS_PASS,
'duration': '{:.9f}s'.format(elapsed_time)
}
self._session.post(self._url, json={'testResults': [tr]})
@contextlib.contextmanager
def setup_rdb(function_name, prefix):
"""Context Manager function for ResultDB reporting.
def client(prefix):
"""Returns a client for ResultSink.
This is a context manager that returns a client for ResultSink,
if LUCI_CONTEXT with a section of result_sink is present. When the context
is closed, all the connetions to the SinkServer are closed.
Args:
function_name (str): The name of the function we are about to run.
prefix (str): The prefix for the name of the test. The format for this is
presubmit:gerrit_host/folder/to/repo:path/to/file/
prefix: A prefix to be added to the test ID of reported function names.
The format for this is
presubmit:gerrit_host/folder/to/repo:path/to/file/
for example,
presubmit:chromium-review.googlesource.com/chromium/src/:services/viz/
presubmit:chromium-review.googlesource.com/chromium/src/:services/viz/
Returns:
An instance of ResultSink() if the luci context is present. None, otherwise.
"""
sink = None
if 'LUCI_CONTEXT' in os.environ:
with open(os.environ['LUCI_CONTEXT']) as f:
j = json.load(f)
if 'result_sink' in j:
sink = j['result_sink']
luci_ctx = os.environ.get('LUCI_CONTEXT')
if not luci_ctx:
yield None
return
sink_ctx = None
with open(luci_ctx) as f:
sink_ctx = json.load(f).get('result_sink')
if not sink_ctx:
yield None
return
my_status = ResultSinkStatus()
start_time = time.time()
try:
yield my_status
except Exception:
my_status.status = STATUS_FAIL
raise
finally:
end_time = time.time()
elapsed_time = end_time - start_time
if sink != None:
tr = {
'testId': '{0}:{1}'.format(prefix, function_name),
'status': my_status.status,
'expected': (my_status.status == STATUS_PASS),
'duration': '{:.9f}s'.format(elapsed_time)
}
requests.post(
url='http://{0}/prpc/luci.resultsink.v1.Sink/ReportTestResults'
.format(sink['address']),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink {0}'.format(sink['auth_token'])
},
data=json.dumps({'testResults': [tr]})
)
url = 'http://{0}/prpc/luci.resultsink.v1.Sink/ReportTestResults'.format(
sink_ctx['address'])
with requests.Session() as s:
s.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink {0}'.format(sink_ctx['auth_token'])
}
yield ResultSink(s, url, prefix)
......@@ -46,6 +46,7 @@ import json
import owners
import owners_finder
import presubmit_support as presubmit
import rdb_wrapper
import scm
import subprocess2 as subprocess
......@@ -168,6 +169,7 @@ index fe3de7b..54ae6e1 100755
presubmit._ASKED_FOR_FEEDBACK = False
self.fake_root_dir = self.RootDir()
self.fake_change = FakeChange(self)
self.rdb_client = mock.MagicMock()
mock.patch('gclient_utils.FileRead').start()
mock.patch('gclient_utils.FileWrite').start()
......@@ -180,7 +182,8 @@ index fe3de7b..54ae6e1 100755
mock.patch('os.path.isfile').start()
mock.patch('os.remove').start()
mock.patch('presubmit_support._parse_files').start()
mock.patch('presubmit_support.rdb_wrapper.setup_rdb').start()
mock.patch('presubmit_support.rdb_wrapper.client',
return_value=self.rdb_client).start()
mock.patch('presubmit_support.sigint_handler').start()
mock.patch('presubmit_support.time_time', return_value=0).start()
mock.patch('presubmit_support.warn').start()
......@@ -538,14 +541,45 @@ class PresubmitUnittest(PresubmitTestsBase):
' return results\n',
fake_presubmit))
presubmit.rdb_wrapper.setup_rdb.assert_called()
self.assertRaises(presubmit.PresubmitFailure,
executer.ExecPresubmitScript,
'def CheckChangeOnCommit(input_api, output_api):\n'
' return ["foo"]',
fake_presubmit)
def testExecPresubmitScriptWithResultDB(self):
description_lines = ('Hello there', 'this is a change', 'BUG=123')
files = [['A', 'foo\\blat.cc']]
fake_presubmit = os.path.join(self.fake_root_dir, 'PRESUBMIT.py')
change = presubmit.Change('mychange', '\n'.join(description_lines),
self.fake_root_dir, files, 0, 0, None)
executer = presubmit.PresubmitExecuter(change, True, None, False)
sink = self.rdb_client.__enter__.return_value = mock.MagicMock()
# STATUS_PASS on success
executer.ExecPresubmitScript(
'def CheckChangeOnCommit(input_api, output_api):\n'
' return [output_api.PresubmitResult("test")]\n', fake_presubmit)
sink.report.assert_called_with('CheckChangeOnCommit',
rdb_wrapper.STATUS_PASS, 0)
# STATUS_FAIL on exception
sink.reset_mock()
self.assertRaises(
Exception, executer.ExecPresubmitScript,
'def CheckChangeOnCommit(input_api, output_api):\n'
' raise Exception("boom")', fake_presubmit)
sink.report.assert_called_with('CheckChangeOnCommit',
rdb_wrapper.STATUS_FAIL, 0)
# STATUS_FAIL on fatal error
sink.reset_mock()
executer.ExecPresubmitScript(
'def CheckChangeOnCommit(input_api, output_api):\n'
' return [output_api.PresubmitError("error")]\n', fake_presubmit)
sink.report.assert_called_with('CheckChangeOnCommit',
rdb_wrapper.STATUS_FAIL, 0)
def testExecPresubmitScriptTemporaryFilesRemoval(self):
tempfile.NamedTemporaryFile.side_effect = [
MockTemporaryFile('baz'),
......
......@@ -7,80 +7,96 @@
from __future__ import print_function
import contextlib
import json
import logging
import os
import requests
import sys
import tempfile
import time
import unittest
if sys.version_info.major == 2:
import mock
BUILTIN_OPEN = '__builtin__.open'
else:
from unittest import mock
BUILTIN_OPEN = 'builtins.open'
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import rdb_wrapper
class TestSetupRDB(unittest.TestCase):
def setUp(self):
super(TestSetupRDB, self).setUp()
mock.patch(BUILTIN_OPEN, mock.mock_open(read_data =
'''{"result_sink":{"address": "fakeAddr","auth_token" : "p@$$w0rD"}}''')
).start()
mock.patch('os.environ', {'LUCI_CONTEXT': 'dummy_file.txt'}).start()
mock.patch('requests.post').start()
mock.patch('time.time', side_effect=[1.0, 2.0, 3.0, 4.0, 5.0]).start()
def test_setup_rdb(self):
with rdb_wrapper.setup_rdb("_foobar", './my/folder/') as my_status_obj:
self.assertEqual(my_status_obj.status, rdb_wrapper.STATUS_PASS)
my_status_obj.status = rdb_wrapper.STATUS_FAIL
expectedTr = {
'testId' : './my/folder/:_foobar',
'status' : rdb_wrapper.STATUS_FAIL,
'expected': False,
'duration': '1.000000000s'
}
requests.post.assert_called_once_with(
url='http://fakeAddr/prpc/luci.resultsink.v1.Sink/ReportTestResults',
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink p@$$w0rD'
},
data=json.dumps({'testResults': [expectedTr]})
)
@contextlib.contextmanager
def lucictx(ctx):
try:
orig = os.environ.get('LUCI_CONTEXT')
def test_setup_rdb_exception(self):
with self.assertRaises(Exception):
with rdb_wrapper.setup_rdb("_foobar", './my/folder/'):
raise Exception("Generic Error")
if ctx is None:
os.environ.pop('LUCI_CONTEXT', '')
yield
else:
# windows doesn't allow a file to be opened twice at the same time.
# therefore, this closes the temp file before yield, so that
# rdb_wrapper.client() can open the LUCI_CONTEXT file.
f = tempfile.NamedTemporaryFile(delete=False)
f.write(json.dumps(ctx).encode('utf-8'))
f.close()
os.environ['LUCI_CONTEXT'] = f.name
yield
os.unlink(f.name)
expectedTr = {
'testId': './my/folder/:_foobar',
'status': rdb_wrapper.STATUS_FAIL,
'expected': False,
'duration': '1.000000000s'
}
finally:
if orig is None:
os.environ.pop('LUCI_CONTEXT', '')
else:
os.environ['LUCI_CONTEXT'] = orig
@mock.patch.dict(os.environ, {})
class TestClient(unittest.TestCase):
def test_without_lucictx(self):
with lucictx(None):
with rdb_wrapper.client("prefix") as s:
self.assertIsNone(s)
requests.post.assert_called_once_with(
url='http://fakeAddr/prpc/luci.resultsink.v1.Sink/ReportTestResults',
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink p@$$w0rD'
},
data=json.dumps({'testResults': [expectedTr]})
with lucictx({'something else': {'key': 'value'}}):
with rdb_wrapper.client("prefix") as s:
self.assertIsNone(s)
def test_with_lucictx(self):
with lucictx({'result_sink': {'address': '127', 'auth_token': 'secret'}}):
with rdb_wrapper.client("prefix") as s:
self.assertIsNotNone(s)
self.assertEqual(
s._url,
'http://127/prpc/luci.resultsink.v1.Sink/ReportTestResults',
)
self.assertDictEqual(
s._session.headers, {
'Accept': 'application/json',
'Authorization': 'ResultSink secret',
'Content-Type': 'application/json',
})
class TestResultSink(unittest.TestCase):
def test_report(self):
session = mock.MagicMock()
sink = rdb_wrapper.ResultSink(session, 'http://host', 'test_id_prefix/')
sink.report("function_foo", rdb_wrapper.STATUS_PASS, 123)
expected = {
'testId': 'test_id_prefix/function_foo',
'status': rdb_wrapper.STATUS_PASS,
'expected': True,
'duration': '123.000000000s',
}
session.post.assert_called_once_with(
'http://host',
json={'testResults': [expected]},
)
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment