Commit a0728e86 authored by Ben L. Titzer's avatar Ben L. Titzer Committed by Commit Bot

[d8] Remove maximum workers limitation

This CL refactors the lifetime management of the v8::Worker C++ object
and in the process lifts the 100 maximum worker limitation. To do this,
it uses a Managed<v8::Worker> heap object and attaches the managed to
the API worker object.

R=mstarzinger@chromium.org
BUG=v8:9524

Change-Id: I279b7aeb6645a87f9108ee6f572105739721cef4
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1715453
Commit-Queue: Ben Titzer <titzer@chromium.org>
Reviewed-by: 's avatarClemens Hammacher <clemensh@chromium.org>
Reviewed-by: 's avatarMichael Starzinger <mstarzinger@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62932}
parent 26447401
...@@ -67,6 +67,8 @@ class V8_BASE_EXPORT Mutex final { ...@@ -67,6 +67,8 @@ class V8_BASE_EXPORT Mutex final {
return native_handle_; return native_handle_;
} }
V8_INLINE void AssertHeld() { DCHECK_EQ(1, level_); }
private: private:
NativeHandle native_handle_; NativeHandle native_handle_;
#ifdef DEBUG #ifdef DEBUG
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "src/init/v8.h" #include "src/init/v8.h"
#include "src/interpreter/interpreter.h" #include "src/interpreter/interpreter.h"
#include "src/logging/counters.h" #include "src/logging/counters.h"
#include "src/objects/managed.h"
#include "src/objects/objects-inl.h" #include "src/objects/objects-inl.h"
#include "src/objects/objects.h" #include "src/objects/objects.h"
#include "src/parsing/parse-info.h" #include "src/parsing/parse-info.h"
...@@ -76,7 +77,6 @@ namespace { ...@@ -76,7 +77,6 @@ namespace {
const int kMB = 1024 * 1024; const int kMB = 1024 * 1024;
const int kMaxWorkers = 100;
const int kMaxSerializerMemoryUsage = const int kMaxSerializerMemoryUsage =
1 * kMB; // Arbitrary maximum for testing. 1 * kMB; // Arbitrary maximum for testing.
...@@ -227,14 +227,13 @@ Worker* GetWorkerFromInternalField(Isolate* isolate, Local<Object> object) { ...@@ -227,14 +227,13 @@ Worker* GetWorkerFromInternalField(Isolate* isolate, Local<Object> object) {
return nullptr; return nullptr;
} }
Worker* worker = i::Handle<i::Object> handle = Utils::OpenHandle(*object->GetInternalField(0));
static_cast<Worker*>(object->GetAlignedPointerFromInternalField(0)); if (handle->IsSmi()) {
if (worker == nullptr) {
Throw(isolate, "Worker is defunct because main thread is terminating"); Throw(isolate, "Worker is defunct because main thread is terminating");
return nullptr; return nullptr;
} }
auto managed = i::Handle<i::Managed<Worker>>::cast(handle);
return worker; return managed->raw();
} }
base::Thread::Options GetThreadOptions(const char* name) { base::Thread::Options GetThreadOptions(const char* name) {
...@@ -333,7 +332,7 @@ const base::TimeTicks Shell::kInitialTicks = ...@@ -333,7 +332,7 @@ const base::TimeTicks Shell::kInitialTicks =
Global<Function> Shell::stringify_function_; Global<Function> Shell::stringify_function_;
base::LazyMutex Shell::workers_mutex_; base::LazyMutex Shell::workers_mutex_;
bool Shell::allow_new_workers_ = true; bool Shell::allow_new_workers_ = true;
std::vector<Worker*> Shell::workers_; std::unordered_set<std::shared_ptr<Worker>> Shell::running_workers_;
std::vector<ExternalizedContents> Shell::externalized_contents_; std::vector<ExternalizedContents> Shell::externalized_contents_;
std::atomic<bool> Shell::script_executed_{false}; std::atomic<bool> Shell::script_executed_{false};
base::LazyMutex Shell::isolate_status_lock_; base::LazyMutex Shell::isolate_status_lock_;
...@@ -1392,30 +1391,33 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -1392,30 +1391,33 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
return; return;
} }
{ // Initialize the embedder field to 0; if we return early without
base::MutexGuard lock_guard(workers_mutex_.Pointer());
if (workers_.size() >= kMaxWorkers) {
Throw(args.GetIsolate(), "Too many workers, I won't let you create more");
return;
}
// Initialize the embedder field to nullptr; if we return early without
// creating a new Worker (because the main thread is terminating) we can // creating a new Worker (because the main thread is terminating) we can
// early-out from the instance calls. // early-out from the instance calls.
args.Holder()->SetAlignedPointerInInternalField(0, nullptr); args.Holder()->SetInternalField(0, v8::Integer::New(isolate, 0));
{
// Don't allow workers to create more workers if the main thread
// is waiting for existing running workers to terminate.
base::MutexGuard lock_guard(workers_mutex_.Pointer());
if (!allow_new_workers_) return; if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
String::Utf8Value script(args.GetIsolate(), source); String::Utf8Value script(args.GetIsolate(), source);
if (!*script) { if (!*script) {
Throw(args.GetIsolate(), "Can't get worker script"); Throw(args.GetIsolate(), "Can't get worker script");
return; return;
} }
worker->StartExecuteInThread(*script);
// The C++ worker object's lifetime is shared between the Managed<Worker>
// object on the heap, which the JavaScript object points to, and an
// internal std::shared_ptr in the worker thread itself.
auto worker = std::make_shared<Worker>(*script);
i::Isolate* i_isolate = reinterpret_cast<i::Isolate*>(isolate);
const size_t kWorkerSizeEstimate = 4 * 1024 * 1024; // stack + heap.
i::Handle<i::Object> managed = i::Managed<Worker>::FromSharedPtr(
i_isolate, kWorkerSizeEstimate, worker);
args.Holder()->SetInternalField(0, Utils::ToLocal(managed));
Worker::StartWorkerThread(std::move(worker));
} }
} }
...@@ -1475,7 +1477,7 @@ void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) { ...@@ -1475,7 +1477,7 @@ void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) {
int exit_code = (*args)[0] int exit_code = (*args)[0]
->Int32Value(args->GetIsolate()->GetCurrentContext()) ->Int32Value(args->GetIsolate()->GetCurrentContext())
.FromMaybe(0); .FromMaybe(0);
CleanupWorkers(); WaitForRunningWorkers();
args->GetIsolate()->Exit(); args->GetIsolate()->Exit();
OnExit(args->GetIsolate()); OnExit(args->GetIsolate());
base::OS::ExitProcess(exit_code); base::OS::ExitProcess(exit_code);
...@@ -2550,11 +2552,11 @@ void SerializationDataQueue::Clear() { ...@@ -2550,11 +2552,11 @@ void SerializationDataQueue::Clear() {
data_.clear(); data_.clear();
} }
Worker::Worker() Worker::Worker(const char* script)
: in_semaphore_(0), : in_semaphore_(0),
out_semaphore_(0), out_semaphore_(0),
thread_(nullptr), thread_(nullptr),
script_(nullptr), script_(i::StrDup(script)),
running_(false) {} running_(false) {}
Worker::~Worker() { Worker::~Worker() {
...@@ -2562,15 +2564,26 @@ Worker::~Worker() { ...@@ -2562,15 +2564,26 @@ Worker::~Worker() {
thread_ = nullptr; thread_ = nullptr;
delete[] script_; delete[] script_;
script_ = nullptr; script_ = nullptr;
in_queue_.Clear();
out_queue_.Clear();
} }
void Worker::StartExecuteInThread(const char* script) { void Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
running_ = true; worker->running_ = true;
script_ = i::StrDup(script); auto thread = new WorkerThread(worker);
thread_ = new WorkerThread(this); worker->thread_ = thread;
thread_->Start(); thread->Start();
Shell::AddRunningWorker(std::move(worker));
}
void Worker::WorkerThread::Run() {
// Prevent a lifetime cycle from Worker -> WorkerThread -> Worker.
// We must clear the worker_ field of the thread, but we keep the
// worker alive via a stack root until the thread finishes execution
// and removes itself from the running set. Thereafter the only
// remaining reference can be from a JavaScript object via a Managed.
auto worker = std::move(worker_);
worker_ = nullptr;
worker->ExecuteInThread();
Shell::RemoveRunningWorker(worker);
} }
void Worker::PostMessage(std::unique_ptr<SerializationData> data) { void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
...@@ -2936,7 +2949,7 @@ int Shell::RunMain(Isolate* isolate, int argc, char* argv[], bool last_run) { ...@@ -2936,7 +2949,7 @@ int Shell::RunMain(Isolate* isolate, int argc, char* argv[], bool last_run) {
options.isolate_sources[i].WaitForThread(); options.isolate_sources[i].WaitForThread();
} }
} }
CleanupWorkers(); WaitForRunningWorkers();
// In order to finish successfully, success must be != expected_to_throw. // In order to finish successfully, success must be != expected_to_throw.
return success == Shell::options.expected_to_throw ? 1 : 0; return success == Shell::options.expected_to_throw ? 1 : 0;
} }
...@@ -3267,24 +3280,35 @@ MaybeLocal<Value> Shell::DeserializeValue( ...@@ -3267,24 +3280,35 @@ MaybeLocal<Value> Shell::DeserializeValue(
return deserializer.ReadValue(context); return deserializer.ReadValue(context);
} }
void Shell::CleanupWorkers() { void Shell::AddRunningWorker(std::shared_ptr<Worker> worker) {
// Make a copy of workers_, because we don't want to call Worker::Terminate workers_mutex_.Pointer()->AssertHeld(); // caller should hold the mutex.
// while holding the workers_mutex_ lock. Otherwise, if a worker is about to running_workers_.insert(worker);
// create a new Worker, it would deadlock. }
std::vector<Worker*> workers_copy;
void Shell::RemoveRunningWorker(const std::shared_ptr<Worker>& worker) {
base::MutexGuard lock_guard(workers_mutex_.Pointer());
auto it = running_workers_.find(worker);
if (it != running_workers_.end()) running_workers_.erase(it);
}
void Shell::WaitForRunningWorkers() {
// Make a copy of running_workers_, because we don't want to call
// Worker::Terminate while holding the workers_mutex_ lock. Otherwise, if a
// worker is about to create a new Worker, it would deadlock.
std::unordered_set<std::shared_ptr<Worker>> workers_copy;
{ {
base::MutexGuard lock_guard(workers_mutex_.Pointer()); base::MutexGuard lock_guard(workers_mutex_.Pointer());
allow_new_workers_ = false; allow_new_workers_ = false;
workers_copy.swap(workers_); workers_copy.swap(running_workers_);
} }
for (Worker* worker : workers_copy) { for (auto& worker : workers_copy) {
worker->WaitForThread(); worker->WaitForThread();
delete worker;
} }
// Now that all workers are terminated, we can re-enable Worker creation. // Now that all workers are terminated, we can re-enable Worker creation.
base::MutexGuard lock_guard(workers_mutex_.Pointer()); base::MutexGuard lock_guard(workers_mutex_.Pointer());
DCHECK(running_workers_.empty());
allow_new_workers_ = true; allow_new_workers_ = true;
externalized_contents_.clear(); externalized_contents_.clear();
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <vector> #include <vector>
#include "src/base/once.h" #include "src/base/once.h"
...@@ -207,12 +208,9 @@ class SerializationDataQueue { ...@@ -207,12 +208,9 @@ class SerializationDataQueue {
class Worker { class Worker {
public: public:
Worker(); explicit Worker(const char* script);
~Worker(); ~Worker();
// Run the given script on this Worker. This function should only be called
// once, and should only be called by the thread that created the Worker.
void StartExecuteInThread(const char* script);
// Post a message to the worker's incoming message queue. The worker will // Post a message to the worker's incoming message queue. The worker will
// take ownership of the SerializationData. // take ownership of the SerializationData.
// This function should only be called by the thread that created the Worker. // This function should only be called by the thread that created the Worker.
...@@ -231,17 +229,20 @@ class Worker { ...@@ -231,17 +229,20 @@ class Worker {
// This function can be called by any thread. // This function can be called by any thread.
void WaitForThread(); void WaitForThread();
// Start running the given worker in another thread.
static void StartWorkerThread(std::shared_ptr<Worker> worker);
private: private:
class WorkerThread : public base::Thread { class WorkerThread : public base::Thread {
public: public:
explicit WorkerThread(Worker* worker) explicit WorkerThread(std::shared_ptr<Worker> worker)
: base::Thread(base::Thread::Options("WorkerThread")), : base::Thread(base::Thread::Options("WorkerThread")),
worker_(worker) {} worker_(std::move(worker)) {}
void Run() override { worker_->ExecuteInThread(); } void Run() override;
private: private:
Worker* worker_; std::shared_ptr<Worker> worker_;
}; };
void ExecuteInThread(); void ExecuteInThread();
...@@ -378,7 +379,6 @@ class Shell : public i::AllStatic { ...@@ -378,7 +379,6 @@ class Shell : public i::AllStatic {
Isolate* isolate, Local<Value> value, Local<Value> transfer); Isolate* isolate, Local<Value> value, Local<Value> transfer);
static MaybeLocal<Value> DeserializeValue( static MaybeLocal<Value> DeserializeValue(
Isolate* isolate, std::unique_ptr<SerializationData> data); Isolate* isolate, std::unique_ptr<SerializationData> data);
static void CleanupWorkers();
static int* LookupCounter(const char* name); static int* LookupCounter(const char* name);
static void* CreateHistogram(const char* name, int min, int max, static void* CreateHistogram(const char* name, int min, int max,
size_t buckets); size_t buckets);
...@@ -493,6 +493,10 @@ class Shell : public i::AllStatic { ...@@ -493,6 +493,10 @@ class Shell : public i::AllStatic {
!options.test_shell; !options.test_shell;
} }
static void WaitForRunningWorkers();
static void AddRunningWorker(std::shared_ptr<Worker> worker);
static void RemoveRunningWorker(const std::shared_ptr<Worker>& worker);
private: private:
static Global<Context> evaluation_context_; static Global<Context> evaluation_context_;
static base::OnceType quit_once_; static base::OnceType quit_once_;
...@@ -509,7 +513,7 @@ class Shell : public i::AllStatic { ...@@ -509,7 +513,7 @@ class Shell : public i::AllStatic {
static base::LazyMutex workers_mutex_; // Guards the following members. static base::LazyMutex workers_mutex_; // Guards the following members.
static bool allow_new_workers_; static bool allow_new_workers_;
static std::vector<Worker*> workers_; static std::unordered_set<std::shared_ptr<Worker>> running_workers_;
static std::vector<ExternalizedContents> externalized_contents_; static std::vector<ExternalizedContents> externalized_contents_;
// Multiple isolates may update this flag concurrently. // Multiple isolates may update this flag concurrently.
......
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `postMessage(42)`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Terminate half of the workers early.
for (let i = 0; i < workers.length; i++) {
if ((i & 1) == 1) workers[i].terminate();
}
// Get messages from some workers.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
assertTrue(msg === undefined || msg === 42);
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
gc();
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc
let script = `onmessage =
function(msg) {
if (msg.depth > 0) {
print("spawn");
let w = new Worker(msg.script, {type : "string"});
w.postMessage({script: msg.script, depth: msg.depth - 1});
let m = w.getMessage();
w.terminate();
postMessage(m);
} else {
postMessage(-99);
}
}`;
function RunWorker(depth) {
let w = new Worker(script, {type : "string"});
let array = new Int32Array([55, -77]);
w.postMessage({script: script, depth: depth});
let msg = w.getMessage();
print(msg);
w.terminate();
}
function RunTest(depth, iterations) {
let time = performance.now();
for (let i = 0; i < iterations; i++) {
let now = performance.now();
print(`iteration ${i}, Δ = ${(now - time).toFixed(3)} ms`);
RunWorker(depth);
gc();
time = now;
}
}
// TODO(9524): increase the workload of this test. Runs out of threads
// on too many platforms.
RunTest(1, 1);
RunTest(2, 2);
RunTest(5, 3);
RunTest(9, 2);
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
...@@ -194,6 +194,9 @@ ...@@ -194,6 +194,9 @@
'wasm/compare-exchange-stress': [PASS, SLOW, NO_VARIANTS], 'wasm/compare-exchange-stress': [PASS, SLOW, NO_VARIANTS],
'wasm/compare-exchange64-stress': [PASS, SLOW, NO_VARIANTS], 'wasm/compare-exchange64-stress': [PASS, SLOW, NO_VARIANTS],
# worker creation/shutdown is very slow in debug mode
'd8/d8-worker-shutdown*': [PASS, ['mode == debug', SLOW]],
# case-insensitive unicode regexp relies on case mapping provided by ICU. # case-insensitive unicode regexp relies on case mapping provided by ICU.
'es6/unicode-regexp-ignore-case': [PASS, ['no_i18n == True', FAIL]], 'es6/unicode-regexp-ignore-case': [PASS, ['no_i18n == True', FAIL]],
'es6/unicode-regexp-ignore-case-noi18n': [FAIL, ['no_i18n == True', PASS]], 'es6/unicode-regexp-ignore-case-noi18n': [FAIL, ['no_i18n == True', PASS]],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment