Commit a9c908eb authored by Michal Majewski's avatar Michal Majewski Committed by Commit Bot

Remove test runner's network mode.

Bug: v8:6917
Change-Id: I7dae0715264cdf9f963f2454b101f6260d8493ff
Reviewed-on: https://chromium-review.googlesource.com/758837Reviewed-by: 's avatarMichael Achenbach <machenbach@chromium.org>
Commit-Queue: Michał Majewski <majeski@google.com>
Cr-Commit-Position: refs/heads/master@{#49256}
parent 6aa3349a
#!/usr/bin/env python
#
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import subprocess
import sys
PIDFILE = "/tmp/v8-distributed-testing-server.pid"
ROOT = os.path.abspath(os.path.dirname(sys.argv[0]))
def _PrintUsage():
print("""Usage: python %s COMMAND
Where COMMAND can be any of:
start Starts the server. Forks to the background.
stop Stops the server.
restart Stops, then restarts the server.
setup Creates or updates the environment for the server to run.
update Alias for "setup".
trust <keyfile> Adds the given public key to the list of trusted keys.
help Displays this help text.
""" % sys.argv[0])
def _IsDaemonRunning():
return os.path.exists(PIDFILE)
def _Cmd(cmd):
code = subprocess.call(cmd, shell=True)
if code != 0:
print("Command '%s' returned error code %d" % (cmd, code))
sys.exit(code)
def Update():
# Create directory for private data storage.
data_dir = os.path.join(ROOT, "data")
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# Create directory for trusted public keys of peers (and self).
trusted_dir = os.path.join(ROOT, "trusted")
if not os.path.exists(trusted_dir):
os.makedirs(trusted_dir)
# Install UltraJSON. It is much faster than Python's builtin json.
try:
import ujson #@UnusedImport
except ImportError:
# Install pip if it doesn't exist.
code = subprocess.call("which pip > /dev/null", shell=True)
if code != 0:
apt_get_code = subprocess.call("which apt-get > /dev/null", shell=True)
if apt_get_code == 0:
print("Installing pip...")
_Cmd("sudo apt-get install python-pip")
else:
print("Please install pip on your machine. You can get it at: "
"http://www.pip-installer.org/en/latest/installing.html "
"or via your distro's package manager.")
sys.exit(1)
print("Using pip to install UltraJSON...")
_Cmd("sudo pip install ujson")
# Make sure we have a key pair for signing binaries.
privkeyfile = os.path.expanduser("~/.ssh/v8_dtest")
if not os.path.exists(privkeyfile):
_Cmd("ssh-keygen -t rsa -f %s -N '' -q" % privkeyfile)
fingerprint = subprocess.check_output("ssh-keygen -lf %s" % privkeyfile,
shell=True)
fingerprint = fingerprint.split(" ")[1].replace(":", "")[:16]
pubkeyfile = os.path.join(trusted_dir, "%s.pem" % fingerprint)
if (not os.path.exists(pubkeyfile) or
os.path.getmtime(pubkeyfile) < os.path.getmtime(privkeyfile)):
_Cmd("openssl rsa -in %s -out %s -pubout" % (privkeyfile, pubkeyfile))
with open(pubkeyfile, "a") as f:
f.write(fingerprint + "\n")
datafile = os.path.join(data_dir, "mypubkey")
with open(datafile, "w") as f:
f.write(fingerprint + "\n")
# Check out or update the server implementation in the current directory.
testrunner_dir = os.path.join(ROOT, "testrunner")
if os.path.exists(os.path.join(testrunner_dir, "server/daemon.py")):
_Cmd("cd %s; svn up" % testrunner_dir)
else:
path = ("http://v8.googlecode.com/svn/branches/bleeding_edge/"
"tools/testrunner")
_Cmd("svn checkout --force %s %s" % (path, testrunner_dir))
# Update this very script.
path = ("http://v8.googlecode.com/svn/branches/bleeding_edge/"
"tools/test-server.py")
scriptname = os.path.abspath(sys.argv[0])
_Cmd("svn cat %s > %s" % (path, scriptname))
# Check out or update V8.
v8_dir = os.path.join(ROOT, "v8")
if os.path.exists(v8_dir):
_Cmd("cd %s; git fetch" % v8_dir)
else:
_Cmd("git clone git://github.com/v8/v8.git %s" % v8_dir)
print("Finished.")
# Handle "setup" here, because when executing that we can't import anything
# else yet.
if __name__ == "__main__" and len(sys.argv) == 2:
if sys.argv[1] in ("setup", "update"):
if _IsDaemonRunning():
print("Please stop the server before updating. Exiting.")
sys.exit(1)
Update()
sys.exit(0)
# Other parameters are handled below.
#==========================================================
# At this point we can assume that the implementation is available,
# so we can import it.
try:
from testrunner.server import constants
from testrunner.server import local_handler
from testrunner.server import main
except Exception, e:
print(e)
print("Failed to import implementation. Have you run 'setup'?")
sys.exit(1)
def _StartDaemon(daemon):
if not os.path.isdir(os.path.join(ROOT, "v8")):
print("No 'v8' working directory found. Have you run 'setup'?")
sys.exit(1)
daemon.start()
if __name__ == "__main__":
if len(sys.argv) == 2:
arg = sys.argv[1]
if arg == "start":
daemon = main.Server(PIDFILE, ROOT)
_StartDaemon(daemon)
elif arg == "stop":
daemon = main.Server(PIDFILE, ROOT)
daemon.stop()
elif arg == "restart":
daemon = main.Server(PIDFILE, ROOT)
daemon.stop()
_StartDaemon(daemon)
elif arg in ("help", "-h", "--help"):
_PrintUsage()
elif arg == "status":
if not _IsDaemonRunning():
print("Server not running.")
else:
print(local_handler.LocalQuery([constants.REQUEST_STATUS]))
else:
print("Unknown command")
_PrintUsage()
sys.exit(2)
elif len(sys.argv) == 3:
arg = sys.argv[1]
if arg == "approve":
filename = sys.argv[2]
if not os.path.exists(filename):
print("%s does not exist.")
sys.exit(1)
filename = os.path.abspath(filename)
if _IsDaemonRunning():
response = local_handler.LocalQuery([constants.ADD_TRUSTED, filename])
else:
daemon = main.Server(PIDFILE, ROOT)
response = daemon.CopyToTrusted(filename)
print("Added certificate %s to trusted certificates." % response)
else:
print("Unknown command")
_PrintUsage()
sys.exit(2)
else:
print("Unknown command")
_PrintUsage()
sys.exit(2)
sys.exit(0)
Test suite runner for V8, including support for distributed running.
====================================================================
Local usage instructions:
=========================
Run the main script with --help to get detailed usage instructions:
$ tools/run-tests.py --help
The interface is mostly the same as it was for the old test runner.
You'll likely want something like this:
$ tools/run-tests.py --nonetwork --arch ia32 --mode release
--nonetwork is the default on Mac and Windows. If you don't specify --arch
and/or --mode, all available values will be used and run in turn (e.g.,
omitting --mode from the above example will run ia32 in both Release and Debug
modes).
Networked usage instructions:
=============================
Networked running is only supported on Linux currently. Make sure that all
machines participating in the cluster are binary-compatible (e.g. mixing
Ubuntu Lucid and Precise doesn't work).
Setup:
------
1.) Copy tools/test-server.py to a new empty directory anywhere on your hard
drive (preferably not inside your V8 checkout just to keep things clean).
Please do create a copy, not just a symlink.
2.) Navigate to the new directory and let the server setup itself:
$ ./test-server.py setup
This will install PIP and UltraJSON, create a V8 working directory, and
generate a keypair.
3.) Swap public keys with someone who's already part of the networked cluster.
$ cp trusted/`cat data/mypubkey`.pem /where/peers/can/see/it/myname.pem
$ ./test-server.py approve /wherever/they/put/it/yourname.pem
Usage:
------
1.) Start your server:
$ ./test-server.py start
2.) (Optionally) inspect the server's status:
$ ./test-server.py status
3.) From your regular V8 working directory, run tests:
$ tool/run-tests.py --arch ia32 --mode debug
4.) (Optionally) enjoy the speeeeeeeeeeeeeeeed
Architecture overview:
======================
Code organization:
------------------
This section is written from the point of view of the tools/ directory.
./run-tests.py:
Main script. Parses command-line options and drives the test execution
procedure from a high level. Imports the actual implementation of all
steps from the testrunner/ directory.
./test-server.py:
Interface to interact with the server. Contains code to setup the server's
working environment and can start and stop server daemon processes.
Imports some stuff from the testrunner/server/ directory.
./testrunner/local/*:
Implementation needed to run tests locally. Used by run-tests.py. Inspired by
(and partly copied verbatim from) the original test.py script.
./testrunner/objects/*:
A bunch of data container classes, used by the scripts in the various other
directories; serializable for transmission over the network.
./testrunner/network/*:
Equivalents and extensions of some of the functionality in ./testrunner/local/
as required when dispatching tests to peers on the network.
./testrunner/network/network_execution.py:
Drop-in replacement for ./testrunner/local/execution that distributes
test jobs to network peers instead of running them locally.
./testrunner/network/endpoint.py:
Receiving end of a network distributed job, uses the implementation
in ./testrunner/local/execution.py for actually running the tests.
./testrunner/server/*:
Implementation of the daemon that accepts and runs test execution jobs from
peers on the network. Should ideally have no dependencies on any of the other
directories, but that turned out to be impractical, so there are a few
exceptions.
./testrunner/server/compression.py:
Defines a wrapper around Python TCP sockets that provides JSON based
serialization, gzip based compression, and ensures message completeness.
Networking architecture:
------------------------
The distribution stuff is designed to be a layer between deciding which tests
to run on the one side, and actually running them on the other. The frontend
that the user interacts with is the same for local and networked execution,
and the actual test execution and result gathering code is the same too.
The server daemon starts four separate servers, each listening on another port:
- "Local": Communication with a run-tests.py script running on the same host.
The test driving script e.g. needs to ask for available peers. It then talks
to those peers directly (one of them will be the locally running server).
- "Work": Listens for test job requests from run-tests.py scripts on the network
(including localhost). Accepts an arbitrary number of connections at the
same time, but only works on them in a serialized fashion.
- "Status": Used for communication with other servers on the network, e.g. for
exchanging trusted public keys to create the transitive trust closure.
- "Discovery": Used to detect presence of other peers on the network.
In contrast to the other three, this uses UDP (as opposed to TCP).
Give us a diagram! We love diagrams!
------------------------------------
.
Machine A . Machine B
.
+------------------------------+ .
| run-tests.py | .
| with flag: | .
|--nonetwork --network | .
| | / | | .
| | / | | .
| v / v | .
|BACKEND / distribution | .
+--------- / --------| \ ------+ .
/ | \_____________________
/ | . \
/ | . \
+----- v ----------- v --------+ . +---- v -----------------------+
| LocalHandler | WorkHandler | . | WorkHandler | LocalHandler |
| | | | . | | | |
| | v | . | v | |
| | BACKEND | . | BACKEND | |
|------------- +---------------| . |---------------+--------------|
| Discovery | StatusHandler <----------> StatusHandler | Discovery |
+---- ^ -----------------------+ . +-------------------- ^ -------+
| . |
+---------------------------------------------------------+
Note that the three occurrences of "BACKEND" are the same code
(testrunner/local/execution.py and its imports), but running from three
distinct directories (and on two different machines).
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
class Shell(object):
def __init__(self, shell):
self.shell = shell
self.tests = []
self.total_duration = 0.0
def AddSuite(self, suite):
self.tests += suite.tests
self.total_duration += suite.total_duration
def SortTests(self):
self.tests.sort(cmp=lambda x, y: cmp(x.duration, y.duration))
def Assign(suites, peers):
total_work = 0.0
for s in suites:
total_work += s.CalculateTotalDuration()
total_power = 0.0
for p in peers:
p.assigned_work = 0.0
total_power += p.jobs * p.relative_performance
for p in peers:
p.needed_work = total_work * p.jobs * p.relative_performance / total_power
shells = {}
for s in suites:
shell = s.shell()
if not shell in shells:
shells[shell] = Shell(shell)
shells[shell].AddSuite(s)
# Convert |shells| to list and sort it, shortest total_duration first.
shells = [ shells[s] for s in shells ]
shells.sort(cmp=lambda x, y: cmp(x.total_duration, y.total_duration))
# Sort tests within each shell, longest duration last (so it's
# pop()'ed first).
for s in shells: s.SortTests()
# Sort peers, least needed_work first.
peers.sort(cmp=lambda x, y: cmp(x.needed_work, y.needed_work))
index = 0
for shell in shells:
while len(shell.tests) > 0:
while peers[index].needed_work <= 0:
index += 1
if index == len(peers):
print("BIG FAT WARNING: Assigning tests to peers failed. "
"Remaining tests: %d. Going to slow mode." % len(shell.tests))
# Pick the least-busy peer. Sorting the list for each test
# is terribly slow, but this is just an emergency fallback anyway.
peers.sort(cmp=lambda x, y: cmp(x.needed_work, y.needed_work))
peers[0].ForceAddOneTest(shell.tests.pop(), shell)
# If the peer already has a shell assigned and would need this one
# and then yet another, try to avoid it.
peer = peers[index]
if (shell.total_duration < peer.needed_work and
len(peer.shells) > 0 and
index < len(peers) - 1 and
shell.total_duration <= peers[index + 1].needed_work):
peers[index + 1].AddTests(shell)
else:
peer.AddTests(shell)
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import multiprocessing
import os
import Queue
import threading
import time
from ..local import execution
from ..local import progress
from ..local import testsuite
from ..local import utils
from ..server import compression
class EndpointProgress(progress.ProgressIndicator):
def __init__(self, sock, server, ctx):
super(EndpointProgress, self).__init__()
self.sock = sock
self.server = server
self.context = ctx
self.results_queue = [] # Accessors must synchronize themselves.
self.sender_lock = threading.Lock()
self.senderthread = threading.Thread(target=self._SenderThread)
self.senderthread.start()
def HasRun(self, test, has_unexpected_output):
# The runners that call this have a lock anyway, so this is safe.
self.results_queue.append(test)
def _SenderThread(self):
keep_running = True
tests = []
self.sender_lock.acquire()
while keep_running:
time.sleep(0.1)
# This should be "atomic enough" without locking :-)
# (We don't care which list any new elements get appended to, as long
# as we don't lose any and the last one comes last.)
current = self.results_queue
self.results_queue = []
for c in current:
if c is None:
keep_running = False
else:
tests.append(c)
if keep_running and len(tests) < 1:
continue # Wait for more results.
if len(tests) < 1: break # We're done here.
result = []
for t in tests:
result.append(t.PackResult())
try:
compression.Send(result, self.sock)
except:
self.runner.terminate = True
for t in tests:
self.server.CompareOwnPerf(t, self.context.arch, self.context.mode)
tests = []
self.sender_lock.release()
def Execute(workspace, ctx, tests, sock, server):
suite_paths = utils.GetSuitePaths(os.path.join(workspace, "test"))
suites = []
for root in suite_paths:
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(workspace, "test", root))
if suite:
suite.SetupWorkingDirectory()
suites.append(suite)
suites_dict = {}
for s in suites:
suites_dict[s.name] = s
s.tests = []
for t in tests:
suite = suites_dict[t.suite]
t.suite = suite
suite.tests.append(t)
suites = [ s for s in suites if len(s.tests) > 0 ]
progress_indicator = EndpointProgress(sock, server, ctx)
runner = execution.Runner(suites, progress_indicator, ctx)
try:
runner.Run(server.jobs)
except IOError, e:
if e.errno == 2:
message = ("File not found: %s, maybe you forgot to 'git add' it?" %
e.filename)
else:
message = "%s" % e
compression.Send([[-1, message]], sock)
progress_indicator.HasRun(None, None) # Sentinel to signal the end.
progress_indicator.sender_lock.acquire() # Released when sending is done.
progress_indicator.sender_lock.release()
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import socket
import subprocess
import threading
import time
from . import distro
from ..local import execution
from ..local import perfdata
from ..objects import peer
from ..objects import workpacket
from ..server import compression
from ..server import constants
from ..server import local_handler
from ..server import signatures
def GetPeers():
data = local_handler.LocalQuery([constants.REQUEST_PEERS])
if not data: return []
return [ peer.Peer.Unpack(p) for p in data ]
class NetworkedRunner(execution.Runner):
def __init__(self, suites, progress_indicator, context, peers, workspace):
self.suites = suites
datapath = os.path.join("out", "testrunner_data")
# TODO(machenbach): These fields should exist now in the superclass.
# But there is no super constructor call. Check if this is a problem.
self.perf_data_manager = perfdata.PerfDataManager(datapath)
self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
for s in suites:
for t in s.tests:
t.duration = self.perfdata.FetchPerfData(t) or 1.0
self._CommonInit(suites, progress_indicator, context)
self.tests = [] # Only used if we need to fall back to local execution.
self.tests_lock = threading.Lock()
self.peers = peers
self.pubkey_fingerprint = None # Fetched later.
self.base_rev = subprocess.check_output(
"cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
shell=True).strip()
self.base_svn_rev = subprocess.check_output(
"cd %s; git log -1 %s" # Get commit description.
" | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line.
" | awk '{print $2}'" # Extract "repository@revision" part.
" | sed -e 's/.*@//'" % # Strip away "repository@".
(workspace, self.base_rev), shell=True).strip()
self.patch = subprocess.check_output(
"cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
self.binaries = {}
self.initialization_lock = threading.Lock()
self.initialization_lock.acquire() # Released when init is done.
self._OpenLocalConnection()
self.local_receiver_thread = threading.Thread(
target=self._ListenLocalConnection)
self.local_receiver_thread.daemon = True
self.local_receiver_thread.start()
self.initialization_lock.acquire()
self.initialization_lock.release()
def _OpenLocalConnection(self):
self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
if code != 0:
raise RuntimeError("Failed to connect to local server")
compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
def _ListenLocalConnection(self):
release_lock_countdown = 1 # Pubkey.
self.local_receiver = compression.Receiver(self.local_socket)
while not self.local_receiver.IsDone():
data = self.local_receiver.Current()
if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
pubkey = data[1]
if not pubkey: raise RuntimeError("Received empty public key")
self.pubkey_fingerprint = pubkey
release_lock_countdown -= 1
if release_lock_countdown == 0:
self.initialization_lock.release()
release_lock_countdown -= 1 # Prevent repeated triggering.
self.local_receiver.Advance()
def Run(self, jobs):
self.indicator.Starting()
need_libv8 = False
for s in self.suites:
shell = s.shell()
if shell not in self.binaries:
path = os.path.join(self.context.shell_dir, shell)
# Check if this is a shared library build.
try:
ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
shell=True)
ldd = ldd.strip().split(" ")
assert ldd[0] == "libv8.so"
assert ldd[1] == "=>"
need_libv8 = True
binary_needs_libv8 = True
libv8 = signatures.ReadFileAndSignature(ldd[2])
except:
binary_needs_libv8 = False
binary = signatures.ReadFileAndSignature(path)
if binary[0] is None:
print("Error: Failed to create signature.")
assert binary[1] != 0
return binary[1]
binary.append(binary_needs_libv8)
self.binaries[shell] = binary
if need_libv8:
self.binaries["libv8.so"] = libv8
distro.Assign(self.suites, self.peers)
# Spawn one thread for each peer.
threads = []
for p in self.peers:
thread = threading.Thread(target=self._TalkToPeer, args=[p])
threads.append(thread)
thread.start()
try:
for thread in threads:
# Use a timeout so that signals (Ctrl+C) will be processed.
thread.join(timeout=10000000)
self._AnalyzePeerRuntimes()
except KeyboardInterrupt:
self.terminate = True
raise
except Exception, _e:
# If there's an exception we schedule an interruption for any
# remaining threads...
self.terminate = True
# ...and then reraise the exception to bail out.
raise
compression.Send(constants.END_OF_STREAM, self.local_socket)
self.local_socket.close()
if self.tests:
self._RunInternal(jobs)
self.indicator.Done()
return not self.failed
def _TalkToPeer(self, peer):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.context.timeout + 10)
code = sock.connect_ex((peer.address, constants.PEER_PORT))
if code == 0:
try:
peer.runtime = None
start_time = time.time()
packet = workpacket.WorkPacket(peer=peer, context=self.context,
base_revision=self.base_svn_rev,
patch=self.patch,
pubkey=self.pubkey_fingerprint)
data, test_map = packet.Pack(self.binaries)
compression.Send(data, sock)
compression.Send(constants.END_OF_STREAM, sock)
rec = compression.Receiver(sock)
while not rec.IsDone() and not self.terminate:
data_list = rec.Current()
for data in data_list:
test_id = data[0]
if test_id < 0:
# The peer is reporting an error.
with self.lock:
print("\nPeer %s reports error: %s" % (peer.address, data[1]))
continue
test = test_map.pop(test_id)
test.MergeResult(data)
try:
self.perfdata.UpdatePerfData(test)
except Exception, e:
print("UpdatePerfData exception: %s" % e)
pass # Just keep working.
with self.lock:
perf_key = self.perfdata.GetKey(test)
compression.Send(
[constants.INFORM_DURATION, perf_key, test.duration,
self.context.arch, self.context.mode],
self.local_socket)
has_unexpected_output = test.suite.HasUnexpectedOutput(test)
if has_unexpected_output:
self.failed.append(test)
if test.output.HasCrashed():
self.crashed += 1
else:
self.succeeded += 1
self.remaining -= 1
self.indicator.HasRun(test, has_unexpected_output)
rec.Advance()
peer.runtime = time.time() - start_time
except KeyboardInterrupt:
sock.close()
raise
except Exception, e:
print("Got exception: %s" % e)
pass # Fall back to local execution.
else:
compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
self.local_socket)
sock.close()
if len(test_map) > 0:
# Some tests have not received any results. Run them locally.
print("\nNo results for %d tests, running them locally." % len(test_map))
self._EnqueueLocally(test_map)
def _EnqueueLocally(self, test_map):
with self.tests_lock:
for test in test_map:
self.tests.append(test_map[test])
def _AnalyzePeerRuntimes(self):
total_runtime = 0.0
total_work = 0.0
for p in self.peers:
if p.runtime is None:
return
total_runtime += p.runtime
total_work += p.assigned_work
for p in self.peers:
p.assigned_work /= total_work
p.runtime /= total_runtime
perf_correction = p.assigned_work / p.runtime
old_perf = p.relative_performance
p.relative_performance = (old_perf + perf_correction) / 2.0
compression.Send([constants.UPDATE_PERF, p.address,
p.relative_performance],
self.local_socket)
......@@ -49,18 +49,3 @@ class Context():
self.no_harness = no_harness
self.use_perf_data = use_perf_data
self.sancov_dir = sancov_dir
def Pack(self):
return [self.arch, self.mode, self.mode_flags, self.timeout, self.isolates,
self.command_prefix, self.extra_flags, self.noi18n,
self.random_seed, self.no_sorting, self.rerun_failures_count,
self.rerun_failures_max, self.predictable, self.no_harness,
self.use_perf_data, self.sancov_dir]
@staticmethod
def Unpack(packed):
# For the order of the fields, refer to Pack() above.
return Context(packed[0], packed[1], None, packed[2], False,
packed[3], packed[4], packed[5], packed[6], packed[7],
packed[8], packed[9], packed[10], packed[11], packed[12],
packed[13], packed[14], packed[15])
......@@ -51,11 +51,3 @@ class Output(object):
def HasTimedOut(self):
return self.timed_out
def Pack(self):
return [self.exit_code, self.timed_out, self.stdout, self.stderr, self.pid]
@staticmethod
def Unpack(packed):
# For the order of the fields, refer to Pack() above.
return Output(packed[0], packed[1], packed[2], packed[3], packed[4])
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
class Peer(object):
def __init__(self, address, jobs, rel_perf, pubkey):
self.address = address # string: IP address
self.jobs = jobs # integer: number of CPUs
self.relative_performance = rel_perf
self.pubkey = pubkey # string: pubkey's fingerprint
self.shells = set() # set of strings
self.needed_work = 0
self.assigned_work = 0
self.tests = [] # list of TestCase objects
self.trusting_me = False # This peer trusts my public key.
self.trusted = False # I trust this peer's public key.
def __str__(self):
return ("Peer at %s, jobs: %d, performance: %.2f, trust I/O: %s/%s" %
(self.address, self.jobs, self.relative_performance,
self.trusting_me, self.trusted))
def AddTests(self, shell):
"""Adds tests from |shell| to this peer.
Stops when self.needed_work reaches zero, or when all of shell's tests
are assigned."""
assert self.needed_work > 0
if shell.shell not in self.shells:
self.shells.add(shell.shell)
while len(shell.tests) > 0 and self.needed_work > 0:
t = shell.tests.pop()
self.needed_work -= t.duration
self.assigned_work += t.duration
shell.total_duration -= t.duration
self.tests.append(t)
def ForceAddOneTest(self, test, shell):
"""Forcibly adds another test to this peer, disregarding needed_work."""
if shell.shell not in self.shells:
self.shells.add(shell.shell)
self.needed_work -= test.duration
self.assigned_work += test.duration
shell.total_duration -= test.duration
self.tests.append(test)
def Pack(self):
"""Creates a JSON serializable representation of this Peer."""
return [self.address, self.jobs, self.relative_performance]
@staticmethod
def Unpack(packed):
"""Creates a Peer object built from a packed representation."""
pubkey_dummy = "" # Callers of this don't care (only the server does).
return Peer(packed[0], packed[1], packed[2], pubkey_dummy)
......@@ -26,8 +26,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from . import output
class TestCase(object):
def __init__(self, suite, path, variant=None, flags=None,
override_shell=None):
......@@ -50,41 +48,9 @@ class TestCase(object):
copy.env = self.env
return copy
def PackTask(self):
"""
Extracts those parts of this object that are required to run the test
and returns them as a JSON serializable object.
"""
assert self.id is not None
return [self.suitename(), self.path, self.variant, self.flags,
self.override_shell, list(self.outcomes or []),
self.id, self.env]
@staticmethod
def UnpackTask(task):
"""Creates a new TestCase object based on packed task data."""
# For the order of the fields, refer to PackTask() above.
test = TestCase(str(task[0]), task[1], task[2], task[3], task[4])
test.outcomes = frozenset(task[5])
test.id = task[6]
test.run = 1
test.env = task[7]
return test
def SetSuiteObject(self, suites):
self.suite = suites[self.suite]
def PackResult(self):
"""Serializes the output of the TestCase after it has run."""
self.suite.StripOutputForTransmit(self)
return [self.id, self.output.Pack(), self.duration]
def MergeResult(self, result):
"""Applies the contents of a Result to this object."""
assert result[0] == self.id
self.output = output.Output.Unpack(result[1])
self.duration = result[2]
def suitename(self):
return self.suite.name
......
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from . import context
from . import testcase
class WorkPacket(object):
def __init__(self, peer=None, context=None, tests=None, binaries=None,
base_revision=None, patch=None, pubkey=None):
self.peer = peer
self.context = context
self.tests = tests
self.binaries = binaries
self.base_revision = base_revision
self.patch = patch
self.pubkey_fingerprint = pubkey
def Pack(self, binaries_dict):
"""
Creates a JSON serializable object containing the data of this
work packet.
"""
need_libv8 = False
binaries = []
for shell in self.peer.shells:
prefetched_binary = binaries_dict[shell]
binaries.append({"name": shell,
"blob": prefetched_binary[0],
"sign": prefetched_binary[1]})
if prefetched_binary[2]:
need_libv8 = True
if need_libv8:
libv8 = binaries_dict["libv8.so"]
binaries.append({"name": "libv8.so",
"blob": libv8[0],
"sign": libv8[1]})
tests = []
test_map = {}
for t in self.peer.tests:
test_map[t.id] = t
tests.append(t.PackTask())
result = {
"binaries": binaries,
"pubkey": self.pubkey_fingerprint,
"context": self.context.Pack(),
"base_revision": self.base_revision,
"patch": self.patch,
"tests": tests
}
return result, test_map
@staticmethod
def Unpack(packed):
"""
Creates a WorkPacket object from the given packed representation.
"""
binaries = packed["binaries"]
pubkey_fingerprint = packed["pubkey"]
ctx = context.Context.Unpack(packed["context"])
base_revision = packed["base_revision"]
patch = packed["patch"]
tests = [ testcase.TestCase.UnpackTask(t) for t in packed["tests"] ]
return WorkPacket(context=ctx, tests=tests, binaries=binaries,
base_revision=base_revision, patch=patch,
pubkey=pubkey_fingerprint)
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import cStringIO as StringIO
try:
import ujson as json
except ImportError:
import json
import os
import struct
import zlib
from . import constants
def Send(obj, sock):
"""
Sends a JSON encodable object over the specified socket (zlib-compressed).
"""
obj = json.dumps(obj)
compression_level = 2 # 1 = fastest, 9 = best compression
compressed = zlib.compress(obj, compression_level)
payload = struct.pack('>i', len(compressed)) + compressed
sock.sendall(payload)
class Receiver(object):
def __init__(self, sock):
self.sock = sock
self.data = StringIO.StringIO()
self.datalength = 0
self._next = self._GetNext()
def IsDone(self):
return self._next == None
def Current(self):
return self._next
def Advance(self):
try:
self._next = self._GetNext()
except:
raise
def _GetNext(self):
try:
while self.datalength < constants.SIZE_T:
try:
chunk = self.sock.recv(8192)
except:
raise
if not chunk: return None
self._AppendData(chunk)
size = self._PopData(constants.SIZE_T)
size = struct.unpack(">i", size)[0]
while self.datalength < size:
try:
chunk = self.sock.recv(8192)
except:
raise
if not chunk: return None
self._AppendData(chunk)
result = self._PopData(size)
result = zlib.decompress(result)
result = json.loads(result)
if result == constants.END_OF_STREAM:
return None
return result
except:
raise
def _AppendData(self, new):
self.data.seek(0, os.SEEK_END)
self.data.write(new)
self.datalength += len(new)
def _PopData(self, length):
self.data.seek(0)
chunk = self.data.read(length)
remaining = self.data.read()
self.data.close()
self.data = StringIO.StringIO()
self.data.write(remaining)
assert self.datalength - length == len(remaining)
self.datalength = len(remaining)
return chunk
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
CLIENT_PORT = 9991 # Port for the local client to connect to.
PEER_PORT = 9992 # Port for peers on the network to connect to.
PRESENCE_PORT = 9993 # Port for presence daemon.
STATUS_PORT = 9994 # Port for network requests not related to workpackets.
END_OF_STREAM = "end of dtest stream" # Marker for end of network requests.
SIZE_T = 4 # Number of bytes used for network request size header.
# Messages understood by the local request handler.
ADD_TRUSTED = "add trusted"
INFORM_DURATION = "inform about duration"
REQUEST_PEERS = "get peers"
UNRESPONSIVE_PEER = "unresponsive peer"
REQUEST_PUBKEY_FINGERPRINT = "get pubkey fingerprint"
REQUEST_STATUS = "get status"
UPDATE_PERF = "update performance"
# Messages understood by the status request handler.
LIST_TRUSTED_PUBKEYS = "list trusted pubkeys"
GET_SIGNED_PUBKEY = "pass on signed pubkey"
NOTIFY_NEW_TRUSTED = "new trusted peer"
TRUST_YOU_NOW = "trust you now"
DO_YOU_TRUST = "do you trust"
#!/usr/bin/env python
# This code has been written by Sander Marechal and published at:
# http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
# where the author has placed it in the public domain (see comment #6 at
# http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/#c6
# ).
# Some minor modifications have been made by the V8 authors. The work remains
# in the public domain.
import atexit
import os
from signal import SIGTERM
from signal import SIGINT
import sys
import time
class Daemon(object):
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null',
stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
# TODO: (debug) re-enable this!
#os.dup2(si.fileno(), sys.stdin.fileno())
#os.dup2(so.fileno(), sys.stdout.fileno())
#os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
# Give the process a one-second chance to exit gracefully.
os.kill(pid, SIGINT)
time.sleep(1)
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be
called after the process has been daemonized by start() or restart().
"""
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import socket
import SocketServer
import StringIO
from . import compression
from . import constants
def LocalQuery(query):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
code = sock.connect_ex(("localhost", constants.CLIENT_PORT))
if code != 0: return None
compression.Send(query, sock)
compression.Send(constants.END_OF_STREAM, sock)
rec = compression.Receiver(sock)
data = None
while not rec.IsDone():
data = rec.Current()
assert data[0] == query[0]
data = data[1]
rec.Advance()
sock.close()
return data
class LocalHandler(SocketServer.BaseRequestHandler):
def handle(self):
rec = compression.Receiver(self.request)
while not rec.IsDone():
data = rec.Current()
action = data[0]
if action == constants.REQUEST_PEERS:
with self.server.daemon.peer_list_lock:
response = [ p.Pack() for p in self.server.daemon.peers
if p.trusting_me ]
compression.Send([action, response], self.request)
elif action == constants.UNRESPONSIVE_PEER:
self.server.daemon.DeletePeer(data[1])
elif action == constants.REQUEST_PUBKEY_FINGERPRINT:
compression.Send([action, self.server.daemon.pubkey_fingerprint],
self.request)
elif action == constants.REQUEST_STATUS:
compression.Send([action, self._GetStatusMessage()], self.request)
elif action == constants.ADD_TRUSTED:
fingerprint = self.server.daemon.CopyToTrusted(data[1])
compression.Send([action, fingerprint], self.request)
elif action == constants.INFORM_DURATION:
test_key = data[1]
test_duration = data[2]
arch = data[3]
mode = data[4]
self.server.daemon.AddPerfData(test_key, test_duration, arch, mode)
elif action == constants.UPDATE_PERF:
address = data[1]
perf = data[2]
self.server.daemon.UpdatePeerPerformance(data[1], data[2])
rec.Advance()
compression.Send(constants.END_OF_STREAM, self.request)
def _GetStatusMessage(self):
sio = StringIO.StringIO()
sio.write("Peers:\n")
with self.server.daemon.peer_list_lock:
for p in self.server.daemon.peers:
sio.write("%s\n" % p)
sio.write("My own jobs: %d, relative performance: %.2f\n" %
(self.server.daemon.jobs, self.server.daemon.relative_perf))
# Low-priority TODO: Return more information. Ideas:
# - currently running anything,
# - time since last job,
# - time since last repository fetch
# - number of workpackets/testcases handled since startup
# - slowest test(s)
result = sio.getvalue()
sio.close()
return result
class LocalSocketServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, daemon):
SocketServer.TCPServer.__init__(self, ("localhost", constants.CLIENT_PORT),
LocalHandler)
self.daemon = daemon
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import multiprocessing
import os
import shutil
import subprocess
import threading
import time
from . import daemon
from . import local_handler
from . import presence_handler
from . import signatures
from . import status_handler
from . import work_handler
from ..network import perfdata
class Server(daemon.Daemon):
def __init__(self, pidfile, root, stdin="/dev/null",
stdout="/dev/null", stderr="/dev/null"):
super(Server, self).__init__(pidfile, stdin, stdout, stderr)
self.root = root
self.local_handler = None
self.local_handler_thread = None
self.work_handler = None
self.work_handler_thread = None
self.status_handler = None
self.status_handler_thread = None
self.presence_daemon = None
self.presence_daemon_thread = None
self.peers = []
self.jobs = multiprocessing.cpu_count()
self.peer_list_lock = threading.Lock()
self.perf_data_lock = None
self.presence_daemon_lock = None
self.datadir = os.path.join(self.root, "data")
pubkey_fingerprint_filename = os.path.join(self.datadir, "mypubkey")
with open(pubkey_fingerprint_filename) as f:
self.pubkey_fingerprint = f.read().strip()
self.relative_perf_filename = os.path.join(self.datadir, "myperf")
if os.path.exists(self.relative_perf_filename):
with open(self.relative_perf_filename) as f:
try:
self.relative_perf = float(f.read())
except:
self.relative_perf = 1.0
else:
self.relative_perf = 1.0
def run(self):
os.nice(20)
self.ip = presence_handler.GetOwnIP()
self.perf_data_manager = perfdata.PerfDataManager(self.datadir)
self.perf_data_lock = threading.Lock()
self.local_handler = local_handler.LocalSocketServer(self)
self.local_handler_thread = threading.Thread(
target=self.local_handler.serve_forever)
self.local_handler_thread.start()
self.work_handler = work_handler.WorkSocketServer(self)
self.work_handler_thread = threading.Thread(
target=self.work_handler.serve_forever)
self.work_handler_thread.start()
self.status_handler = status_handler.StatusSocketServer(self)
self.status_handler_thread = threading.Thread(
target=self.status_handler.serve_forever)
self.status_handler_thread.start()
self.presence_daemon = presence_handler.PresenceDaemon(self)
self.presence_daemon_thread = threading.Thread(
target=self.presence_daemon.serve_forever)
self.presence_daemon_thread.start()
self.presence_daemon.FindPeers()
time.sleep(0.5) # Give those peers some time to reply.
with self.peer_list_lock:
for p in self.peers:
if p.address == self.ip: continue
status_handler.RequestTrustedPubkeys(p, self)
while True:
try:
self.PeriodicTasks()
time.sleep(60)
except Exception, e:
print("MAIN LOOP EXCEPTION: %s" % e)
self.Shutdown()
break
except KeyboardInterrupt:
self.Shutdown()
break
def Shutdown(self):
with open(self.relative_perf_filename, "w") as f:
f.write("%s" % self.relative_perf)
self.presence_daemon.shutdown()
self.presence_daemon.server_close()
self.local_handler.shutdown()
self.local_handler.server_close()
self.work_handler.shutdown()
self.work_handler.server_close()
self.status_handler.shutdown()
self.status_handler.server_close()
def PeriodicTasks(self):
# If we know peers we don't trust, see if someone else trusts them.
with self.peer_list_lock:
for p in self.peers:
if p.trusted: continue
if self.IsTrusted(p.pubkey):
p.trusted = True
status_handler.ITrustYouNow(p)
continue
for p2 in self.peers:
if not p2.trusted: continue
status_handler.TryTransitiveTrust(p2, p.pubkey, self)
# TODO: Ping for more peers waiting to be discovered.
# TODO: Update the checkout (if currently idle).
def AddPeer(self, peer):
with self.peer_list_lock:
for p in self.peers:
if p.address == peer.address:
return
self.peers.append(peer)
if peer.trusted:
status_handler.ITrustYouNow(peer)
def DeletePeer(self, peer_address):
with self.peer_list_lock:
for i in xrange(len(self.peers)):
if self.peers[i].address == peer_address:
del self.peers[i]
return
def MarkPeerAsTrusting(self, peer_address):
with self.peer_list_lock:
for p in self.peers:
if p.address == peer_address:
p.trusting_me = True
break
def UpdatePeerPerformance(self, peer_address, performance):
with self.peer_list_lock:
for p in self.peers:
if p.address == peer_address:
p.relative_performance = performance
def CopyToTrusted(self, pubkey_filename):
with open(pubkey_filename, "r") as f:
lines = f.readlines()
fingerprint = lines[-1].strip()
target_filename = self._PubkeyFilename(fingerprint)
shutil.copy(pubkey_filename, target_filename)
with self.peer_list_lock:
for peer in self.peers:
if peer.address == self.ip: continue
if peer.pubkey == fingerprint:
status_handler.ITrustYouNow(peer)
else:
result = self.SignTrusted(fingerprint)
status_handler.NotifyNewTrusted(peer, result)
return fingerprint
def _PubkeyFilename(self, pubkey_fingerprint):
return os.path.join(self.root, "trusted", "%s.pem" % pubkey_fingerprint)
def IsTrusted(self, pubkey_fingerprint):
return os.path.exists(self._PubkeyFilename(pubkey_fingerprint))
def ListTrusted(self):
path = os.path.join(self.root, "trusted")
if not os.path.exists(path): return []
return [ f[:-4] for f in os.listdir(path) if f.endswith(".pem") ]
def SignTrusted(self, pubkey_fingerprint):
if not self.IsTrusted(pubkey_fingerprint):
return []
filename = self._PubkeyFilename(pubkey_fingerprint)
result = signatures.ReadFileAndSignature(filename) # Format: [key, sig].
return [pubkey_fingerprint, result[0], result[1], self.pubkey_fingerprint]
def AcceptNewTrusted(self, data):
# The format of |data| matches the return value of |SignTrusted()|.
if not data: return
fingerprint = data[0]
pubkey = data[1]
signature = data[2]
signer = data[3]
if not self.IsTrusted(signer):
return
if self.IsTrusted(fingerprint):
return # Already trusted.
filename = self._PubkeyFilename(fingerprint)
signer_pubkeyfile = self._PubkeyFilename(signer)
if not signatures.VerifySignature(filename, pubkey, signature,
signer_pubkeyfile):
return
return # Nothing more to do.
def AddPerfData(self, test_key, duration, arch, mode):
data_store = self.perf_data_manager.GetStore(arch, mode)
data_store.RawUpdatePerfData(str(test_key), duration)
def CompareOwnPerf(self, test, arch, mode):
data_store = self.perf_data_manager.GetStore(arch, mode)
observed = data_store.FetchPerfData(test)
if not observed: return
own_perf_estimate = observed / test.duration
with self.perf_data_lock:
kLearnRateLimiter = 9999
self.relative_perf *= kLearnRateLimiter
self.relative_perf += own_perf_estimate
self.relative_perf /= (kLearnRateLimiter + 1)
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import socket
import SocketServer
import threading
try:
import ujson as json
except:
import json
from . import constants
from ..objects import peer
STARTUP_REQUEST = "V8 test peer starting up"
STARTUP_RESPONSE = "Let's rock some tests!"
EXIT_REQUEST = "V8 testing peer going down"
def GetOwnIP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
class PresenceHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = json.loads(self.request[0].strip())
if data[0] == STARTUP_REQUEST:
jobs = data[1]
relative_perf = data[2]
pubkey_fingerprint = data[3]
trusted = self.server.daemon.IsTrusted(pubkey_fingerprint)
response = [STARTUP_RESPONSE, self.server.daemon.jobs,
self.server.daemon.relative_perf,
self.server.daemon.pubkey_fingerprint, trusted]
response = json.dumps(response)
self.server.SendTo(self.client_address[0], response)
p = peer.Peer(self.client_address[0], jobs, relative_perf,
pubkey_fingerprint)
p.trusted = trusted
self.server.daemon.AddPeer(p)
elif data[0] == STARTUP_RESPONSE:
jobs = data[1]
perf = data[2]
pubkey_fingerprint = data[3]
p = peer.Peer(self.client_address[0], jobs, perf, pubkey_fingerprint)
p.trusted = self.server.daemon.IsTrusted(pubkey_fingerprint)
p.trusting_me = data[4]
self.server.daemon.AddPeer(p)
elif data[0] == EXIT_REQUEST:
self.server.daemon.DeletePeer(self.client_address[0])
if self.client_address[0] == self.server.daemon.ip:
self.server.shutdown_lock.release()
class PresenceDaemon(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
def __init__(self, daemon):
self.daemon = daemon
address = (daemon.ip, constants.PRESENCE_PORT)
SocketServer.UDPServer.__init__(self, address, PresenceHandler)
self.shutdown_lock = threading.Lock()
def shutdown(self):
self.shutdown_lock.acquire()
self.SendToAll(json.dumps([EXIT_REQUEST]))
self.shutdown_lock.acquire()
self.shutdown_lock.release()
SocketServer.UDPServer.shutdown(self)
def SendTo(self, target, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(message, (target, constants.PRESENCE_PORT))
sock.close()
def SendToAll(self, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = self.daemon.ip.split(".")
for i in range(1, 254):
ip[-1] = str(i)
sock.sendto(message, (".".join(ip), constants.PRESENCE_PORT))
sock.close()
def FindPeers(self):
request = [STARTUP_REQUEST, self.daemon.jobs, self.daemon.relative_perf,
self.daemon.pubkey_fingerprint]
request = json.dumps(request)
self.SendToAll(request)
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import base64
import os
import subprocess
def ReadFileAndSignature(filename):
with open(filename, "rb") as f:
file_contents = base64.b64encode(f.read())
signature_file = filename + ".signature"
if (not os.path.exists(signature_file) or
os.path.getmtime(signature_file) < os.path.getmtime(filename)):
private_key = "~/.ssh/v8_dtest"
code = subprocess.call("openssl dgst -out %s -sign %s %s" %
(signature_file, private_key, filename),
shell=True)
if code != 0: return [None, code]
with open(signature_file) as f:
signature = base64.b64encode(f.read())
return [file_contents, signature]
def VerifySignature(filename, file_contents, signature, pubkeyfile):
with open(filename, "wb") as f:
f.write(base64.b64decode(file_contents))
signature_file = filename + ".foreign_signature"
with open(signature_file, "wb") as f:
f.write(base64.b64decode(signature))
code = subprocess.call("openssl dgst -verify %s -signature %s %s" %
(pubkeyfile, signature_file, filename),
shell=True)
matched = (code == 0)
if not matched:
os.remove(signature_file)
os.remove(filename)
return matched
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import socket
import SocketServer
from . import compression
from . import constants
def _StatusQuery(peer, query):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
code = sock.connect_ex((peer.address, constants.STATUS_PORT))
if code != 0:
# TODO(jkummerow): disconnect (after 3 failures?)
return
compression.Send(query, sock)
compression.Send(constants.END_OF_STREAM, sock)
rec = compression.Receiver(sock)
data = None
while not rec.IsDone():
data = rec.Current()
assert data[0] == query[0]
data = data[1]
rec.Advance()
sock.close()
return data
def RequestTrustedPubkeys(peer, server):
pubkey_list = _StatusQuery(peer, [constants.LIST_TRUSTED_PUBKEYS])
for pubkey in pubkey_list:
if server.IsTrusted(pubkey): continue
result = _StatusQuery(peer, [constants.GET_SIGNED_PUBKEY, pubkey])
server.AcceptNewTrusted(result)
def NotifyNewTrusted(peer, data):
_StatusQuery(peer, [constants.NOTIFY_NEW_TRUSTED] + data)
def ITrustYouNow(peer):
_StatusQuery(peer, [constants.TRUST_YOU_NOW])
def TryTransitiveTrust(peer, pubkey, server):
if _StatusQuery(peer, [constants.DO_YOU_TRUST, pubkey]):
result = _StatusQuery(peer, [constants.GET_SIGNED_PUBKEY, pubkey])
server.AcceptNewTrusted(result)
class StatusHandler(SocketServer.BaseRequestHandler):
def handle(self):
rec = compression.Receiver(self.request)
while not rec.IsDone():
data = rec.Current()
action = data[0]
if action == constants.LIST_TRUSTED_PUBKEYS:
response = self.server.daemon.ListTrusted()
compression.Send([action, response], self.request)
elif action == constants.GET_SIGNED_PUBKEY:
response = self.server.daemon.SignTrusted(data[1])
compression.Send([action, response], self.request)
elif action == constants.NOTIFY_NEW_TRUSTED:
self.server.daemon.AcceptNewTrusted(data[1:])
pass # No response.
elif action == constants.TRUST_YOU_NOW:
self.server.daemon.MarkPeerAsTrusting(self.client_address[0])
pass # No response.
elif action == constants.DO_YOU_TRUST:
response = self.server.daemon.IsTrusted(data[1])
compression.Send([action, response], self.request)
rec.Advance()
compression.Send(constants.END_OF_STREAM, self.request)
class StatusSocketServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, daemon):
address = (daemon.ip, constants.STATUS_PORT)
SocketServer.TCPServer.__init__(self, address, StatusHandler)
self.daemon = daemon
# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import SocketServer
import stat
import subprocess
import threading
from . import compression
from . import constants
from . import signatures
from ..network import endpoint
from ..objects import workpacket
class WorkHandler(SocketServer.BaseRequestHandler):
def handle(self):
rec = compression.Receiver(self.request)
while not rec.IsDone():
data = rec.Current()
with self.server.job_lock:
self._WorkOnWorkPacket(data)
rec.Advance()
def _WorkOnWorkPacket(self, data):
server_root = self.server.daemon.root
v8_root = os.path.join(server_root, "v8")
os.chdir(v8_root)
packet = workpacket.WorkPacket.Unpack(data)
self.ctx = packet.context
self.ctx.shell_dir = os.path.join("out",
"%s.%s" % (self.ctx.arch, self.ctx.mode))
if not os.path.isdir(self.ctx.shell_dir):
os.makedirs(self.ctx.shell_dir)
for binary in packet.binaries:
if not self._UnpackBinary(binary, packet.pubkey_fingerprint):
return
if not self._CheckoutRevision(packet.base_revision):
return
if not self._ApplyPatch(packet.patch):
return
tests = packet.tests
endpoint.Execute(v8_root, self.ctx, tests, self.request, self.server.daemon)
self._SendResponse()
def _SendResponse(self, error_message=None):
try:
if error_message:
compression.Send([[-1, error_message]], self.request)
compression.Send(constants.END_OF_STREAM, self.request)
return
except Exception, e:
pass # Peer is gone. There's nothing we can do.
# Clean up.
self._Call("git checkout -f")
self._Call("git clean -f -d")
self._Call("rm -rf %s" % self.ctx.shell_dir)
def _UnpackBinary(self, binary, pubkey_fingerprint):
binary_name = binary["name"]
if binary_name == "libv8.so":
libdir = os.path.join(self.ctx.shell_dir, "lib.target")
if not os.path.exists(libdir): os.makedirs(libdir)
target = os.path.join(libdir, binary_name)
else:
target = os.path.join(self.ctx.shell_dir, binary_name)
pubkeyfile = "../trusted/%s.pem" % pubkey_fingerprint
if not signatures.VerifySignature(target, binary["blob"],
binary["sign"], pubkeyfile):
self._SendResponse("Signature verification failed")
return False
os.chmod(target, stat.S_IRWXU)
return True
def _CheckoutRevision(self, base_svn_revision):
get_hash_cmd = (
"git log -1 --format=%%H --remotes --grep='^git-svn-id:.*@%s'" %
base_svn_revision)
try:
base_revision = subprocess.check_output(get_hash_cmd, shell=True)
if not base_revision: raise ValueError
except:
self._Call("git fetch")
try:
base_revision = subprocess.check_output(get_hash_cmd, shell=True)
if not base_revision: raise ValueError
except:
self._SendResponse("Base revision not found.")
return False
code = self._Call("git checkout -f %s" % base_revision)
if code != 0:
self._SendResponse("Error trying to check out base revision.")
return False
code = self._Call("git clean -f -d")
if code != 0:
self._SendResponse("Failed to reset checkout")
return False
return True
def _ApplyPatch(self, patch):
if not patch: return True # Just skip if the patch is empty.
patchfilename = "_dtest_incoming_patch.patch"
with open(patchfilename, "w") as f:
f.write(patch)
code = self._Call("git apply %s" % patchfilename)
if code != 0:
self._SendResponse("Error applying patch.")
return False
return True
def _Call(self, cmd):
return subprocess.call(cmd, shell=True)
class WorkSocketServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, daemon):
address = (daemon.ip, constants.PEER_PORT)
SocketServer.TCPServer.__init__(self, address, WorkHandler)
self.job_lock = threading.Lock()
self.daemon = daemon
......@@ -24,7 +24,6 @@ from testrunner.local import testsuite
from testrunner.local import utils
from testrunner.local import verbose
from testrunner.local.variants import ALL_VARIANTS
from testrunner.network import network_execution
from testrunner.objects import context
......@@ -163,11 +162,6 @@ class StandardTestRunner(base_runner.BaseTestRunner):
parser.add_option("--no-harness", "--noharness",
help="Run without test harness of a given suite",
default=False, action="store_true")
parser.add_option("--network", help="Distribute tests on the network",
default=False, dest="network", action="store_true")
parser.add_option("--no-network", "--nonetwork",
help="Don't distribute tests on the network",
dest="network", action="store_false")
parser.add_option("--no-presubmit", "--nopresubmit",
help='Skip presubmit checks (deprecated)',
default=False, dest="no_presubmit", action="store_true")
......@@ -248,10 +242,6 @@ class StandardTestRunner(base_runner.BaseTestRunner):
print("sancov-dir %s doesn't exist" % self.sancov_dir)
raise base_runner.TestRunnerError()
if options.command_prefix and options.network:
print("Specifying --command-prefix disables network distribution, "
"running tests locally.")
options.network = False
options.command_prefix = shlex.split(options.command_prefix)
options.extra_flags = sum(map(shlex.split, options.extra_flags), [])
......@@ -319,8 +309,6 @@ class StandardTestRunner(base_runner.BaseTestRunner):
if options.valgrind:
run_valgrind = os.path.join("tools", "run-valgrind.py")
# This is OK for distributed running, so we don't need to disable
# network.
options.command_prefix = (["python", "-u", run_valgrind] +
options.command_prefix)
def CheckTestMode(name, option):
......@@ -473,7 +461,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
if options.report:
verbose.PrintReport(all_tests)
# Run the tests, either locally or distributed on the network.
# Run the tests.
start_time = time.time()
progress_indicator = progress.IndicatorNotifier()
progress_indicator.Register(
......@@ -491,32 +479,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
progress_indicator.Register(progress.FlakinessTestProgressIndicator(
options.flakiness_results))
run_networked = options.network
if not run_networked:
if options.verbose:
print("Network distribution disabled, running tests locally.")
elif utils.GuessOS() != "linux":
print("Network distribution is only supported on Linux, sorry!")
run_networked = False
peers = []
if run_networked:
peers = network_execution.GetPeers()
if not peers:
print("No connection to distribution server; running tests locally.")
run_networked = False
elif len(peers) == 1:
print("No other peers on the network; running tests locally.")
run_networked = False
elif num_tests <= 100:
print("Less than 100 tests, running them locally.")
run_networked = False
if run_networked:
runner = network_execution.NetworkedRunner(
suites, progress_indicator, ctx, peers, base_runner.BASE_DIR)
else:
runner = execution.Runner(suites, progress_indicator, ctx)
exit_code = runner.Run(options.j)
overall_duration = time.time() - start_time
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment