Commit e25fcd26 authored by Maya Lekova's avatar Maya Lekova Committed by Commit Bot

Revert "[d8] Remove maximum workers limitation"

This reverts commit a0728e86.

Reason for revert: Times out on Windows & debug builds - https://ci.chromium.org/p/v8/builders/ci/V8%20Win32%20-%20nosnap%20-%20shared/34484

Original change's description:
> [d8] Remove maximum workers limitation
> 
> This CL refactors the lifetime management of the v8::Worker C++ object
> and in the process lifts the 100 maximum worker limitation. To do this,
> it uses a Managed<v8::Worker> heap object and attaches the managed to
> the API worker object.
> 
> R=​mstarzinger@chromium.org
> BUG=v8:9524
> 
> Change-Id: I279b7aeb6645a87f9108ee6f572105739721cef4
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1715453
> Commit-Queue: Ben Titzer <titzer@chromium.org>
> Reviewed-by: Clemens Hammacher <clemensh@chromium.org>
> Reviewed-by: Michael Starzinger <mstarzinger@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#62932}

TBR=mstarzinger@chromium.org,titzer@chromium.org,clemensh@chromium.org

Change-Id: I3a27937cba13b5413390f49268a107c184515153
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: v8:9524
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1720590Reviewed-by: 's avatarMaya Lekova <mslekova@chromium.org>
Commit-Queue: Maya Lekova <mslekova@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62937}
parent f47cbb28
...@@ -67,8 +67,6 @@ class V8_BASE_EXPORT Mutex final { ...@@ -67,8 +67,6 @@ class V8_BASE_EXPORT Mutex final {
return native_handle_; return native_handle_;
} }
V8_INLINE void AssertHeld() { DCHECK_EQ(1, level_); }
private: private:
NativeHandle native_handle_; NativeHandle native_handle_;
#ifdef DEBUG #ifdef DEBUG
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include "src/init/v8.h" #include "src/init/v8.h"
#include "src/interpreter/interpreter.h" #include "src/interpreter/interpreter.h"
#include "src/logging/counters.h" #include "src/logging/counters.h"
#include "src/objects/managed.h"
#include "src/objects/objects-inl.h" #include "src/objects/objects-inl.h"
#include "src/objects/objects.h" #include "src/objects/objects.h"
#include "src/parsing/parse-info.h" #include "src/parsing/parse-info.h"
...@@ -77,6 +76,7 @@ namespace { ...@@ -77,6 +76,7 @@ namespace {
const int kMB = 1024 * 1024; const int kMB = 1024 * 1024;
const int kMaxWorkers = 100;
const int kMaxSerializerMemoryUsage = const int kMaxSerializerMemoryUsage =
1 * kMB; // Arbitrary maximum for testing. 1 * kMB; // Arbitrary maximum for testing.
...@@ -227,13 +227,14 @@ Worker* GetWorkerFromInternalField(Isolate* isolate, Local<Object> object) { ...@@ -227,13 +227,14 @@ Worker* GetWorkerFromInternalField(Isolate* isolate, Local<Object> object) {
return nullptr; return nullptr;
} }
i::Handle<i::Object> handle = Utils::OpenHandle(*object->GetInternalField(0)); Worker* worker =
if (handle->IsSmi()) { static_cast<Worker*>(object->GetAlignedPointerFromInternalField(0));
if (worker == nullptr) {
Throw(isolate, "Worker is defunct because main thread is terminating"); Throw(isolate, "Worker is defunct because main thread is terminating");
return nullptr; return nullptr;
} }
auto managed = i::Handle<i::Managed<Worker>>::cast(handle);
return managed->raw(); return worker;
} }
base::Thread::Options GetThreadOptions(const char* name) { base::Thread::Options GetThreadOptions(const char* name) {
...@@ -332,7 +333,7 @@ const base::TimeTicks Shell::kInitialTicks = ...@@ -332,7 +333,7 @@ const base::TimeTicks Shell::kInitialTicks =
Global<Function> Shell::stringify_function_; Global<Function> Shell::stringify_function_;
base::LazyMutex Shell::workers_mutex_; base::LazyMutex Shell::workers_mutex_;
bool Shell::allow_new_workers_ = true; bool Shell::allow_new_workers_ = true;
std::unordered_set<std::shared_ptr<Worker>> Shell::running_workers_; std::vector<Worker*> Shell::workers_;
std::vector<ExternalizedContents> Shell::externalized_contents_; std::vector<ExternalizedContents> Shell::externalized_contents_;
std::atomic<bool> Shell::script_executed_{false}; std::atomic<bool> Shell::script_executed_{false};
base::LazyMutex Shell::isolate_status_lock_; base::LazyMutex Shell::isolate_status_lock_;
...@@ -1391,33 +1392,30 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -1391,33 +1392,30 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
return; return;
} }
// Initialize the embedder field to 0; if we return early without
// creating a new Worker (because the main thread is terminating) we can
// early-out from the instance calls.
args.Holder()->SetInternalField(0, v8::Integer::New(isolate, 0));
{ {
// Don't allow workers to create more workers if the main thread
// is waiting for existing running workers to terminate.
base::MutexGuard lock_guard(workers_mutex_.Pointer()); base::MutexGuard lock_guard(workers_mutex_.Pointer());
if (workers_.size() >= kMaxWorkers) {
Throw(args.GetIsolate(), "Too many workers, I won't let you create more");
return;
}
// Initialize the embedder field to nullptr; if we return early without
// creating a new Worker (because the main thread is terminating) we can
// early-out from the instance calls.
args.Holder()->SetAlignedPointerInInternalField(0, nullptr);
if (!allow_new_workers_) return; if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
String::Utf8Value script(args.GetIsolate(), source); String::Utf8Value script(args.GetIsolate(), source);
if (!*script) { if (!*script) {
Throw(args.GetIsolate(), "Can't get worker script"); Throw(args.GetIsolate(), "Can't get worker script");
return; return;
} }
worker->StartExecuteInThread(*script);
// The C++ worker object's lifetime is shared between the Managed<Worker>
// object on the heap, which the JavaScript object points to, and an
// internal std::shared_ptr in the worker thread itself.
auto worker = std::make_shared<Worker>(*script);
i::Isolate* i_isolate = reinterpret_cast<i::Isolate*>(isolate);
const size_t kWorkerSizeEstimate = 4 * 1024 * 1024; // stack + heap.
i::Handle<i::Object> managed = i::Managed<Worker>::FromSharedPtr(
i_isolate, kWorkerSizeEstimate, worker);
args.Holder()->SetInternalField(0, Utils::ToLocal(managed));
Worker::StartWorkerThread(std::move(worker));
} }
} }
...@@ -1477,7 +1475,7 @@ void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) { ...@@ -1477,7 +1475,7 @@ void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) {
int exit_code = (*args)[0] int exit_code = (*args)[0]
->Int32Value(args->GetIsolate()->GetCurrentContext()) ->Int32Value(args->GetIsolate()->GetCurrentContext())
.FromMaybe(0); .FromMaybe(0);
WaitForRunningWorkers(); CleanupWorkers();
args->GetIsolate()->Exit(); args->GetIsolate()->Exit();
OnExit(args->GetIsolate()); OnExit(args->GetIsolate());
base::OS::ExitProcess(exit_code); base::OS::ExitProcess(exit_code);
...@@ -2552,11 +2550,11 @@ void SerializationDataQueue::Clear() { ...@@ -2552,11 +2550,11 @@ void SerializationDataQueue::Clear() {
data_.clear(); data_.clear();
} }
Worker::Worker(const char* script) Worker::Worker()
: in_semaphore_(0), : in_semaphore_(0),
out_semaphore_(0), out_semaphore_(0),
thread_(nullptr), thread_(nullptr),
script_(i::StrDup(script)), script_(nullptr),
running_(false) {} running_(false) {}
Worker::~Worker() { Worker::~Worker() {
...@@ -2564,26 +2562,15 @@ Worker::~Worker() { ...@@ -2564,26 +2562,15 @@ Worker::~Worker() {
thread_ = nullptr; thread_ = nullptr;
delete[] script_; delete[] script_;
script_ = nullptr; script_ = nullptr;
in_queue_.Clear();
out_queue_.Clear();
} }
void Worker::StartWorkerThread(std::shared_ptr<Worker> worker) { void Worker::StartExecuteInThread(const char* script) {
worker->running_ = true; running_ = true;
auto thread = new WorkerThread(worker); script_ = i::StrDup(script);
worker->thread_ = thread; thread_ = new WorkerThread(this);
thread->Start(); thread_->Start();
Shell::AddRunningWorker(std::move(worker));
}
void Worker::WorkerThread::Run() {
// Prevent a lifetime cycle from Worker -> WorkerThread -> Worker.
// We must clear the worker_ field of the thread, but we keep the
// worker alive via a stack root until the thread finishes execution
// and removes itself from the running set. Thereafter the only
// remaining reference can be from a JavaScript object via a Managed.
auto worker = std::move(worker_);
worker_ = nullptr;
worker->ExecuteInThread();
Shell::RemoveRunningWorker(worker);
} }
void Worker::PostMessage(std::unique_ptr<SerializationData> data) { void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
...@@ -2949,7 +2936,7 @@ int Shell::RunMain(Isolate* isolate, int argc, char* argv[], bool last_run) { ...@@ -2949,7 +2936,7 @@ int Shell::RunMain(Isolate* isolate, int argc, char* argv[], bool last_run) {
options.isolate_sources[i].WaitForThread(); options.isolate_sources[i].WaitForThread();
} }
} }
WaitForRunningWorkers(); CleanupWorkers();
// In order to finish successfully, success must be != expected_to_throw. // In order to finish successfully, success must be != expected_to_throw.
return success == Shell::options.expected_to_throw ? 1 : 0; return success == Shell::options.expected_to_throw ? 1 : 0;
} }
...@@ -3280,35 +3267,24 @@ MaybeLocal<Value> Shell::DeserializeValue( ...@@ -3280,35 +3267,24 @@ MaybeLocal<Value> Shell::DeserializeValue(
return deserializer.ReadValue(context); return deserializer.ReadValue(context);
} }
void Shell::AddRunningWorker(std::shared_ptr<Worker> worker) { void Shell::CleanupWorkers() {
workers_mutex_.Pointer()->AssertHeld(); // caller should hold the mutex. // Make a copy of workers_, because we don't want to call Worker::Terminate
running_workers_.insert(worker); // while holding the workers_mutex_ lock. Otherwise, if a worker is about to
} // create a new Worker, it would deadlock.
std::vector<Worker*> workers_copy;
void Shell::RemoveRunningWorker(const std::shared_ptr<Worker>& worker) {
base::MutexGuard lock_guard(workers_mutex_.Pointer());
auto it = running_workers_.find(worker);
if (it != running_workers_.end()) running_workers_.erase(it);
}
void Shell::WaitForRunningWorkers() {
// Make a copy of running_workers_, because we don't want to call
// Worker::Terminate while holding the workers_mutex_ lock. Otherwise, if a
// worker is about to create a new Worker, it would deadlock.
std::unordered_set<std::shared_ptr<Worker>> workers_copy;
{ {
base::MutexGuard lock_guard(workers_mutex_.Pointer()); base::MutexGuard lock_guard(workers_mutex_.Pointer());
allow_new_workers_ = false; allow_new_workers_ = false;
workers_copy.swap(running_workers_); workers_copy.swap(workers_);
} }
for (auto& worker : workers_copy) { for (Worker* worker : workers_copy) {
worker->WaitForThread(); worker->WaitForThread();
delete worker;
} }
// Now that all workers are terminated, we can re-enable Worker creation. // Now that all workers are terminated, we can re-enable Worker creation.
base::MutexGuard lock_guard(workers_mutex_.Pointer()); base::MutexGuard lock_guard(workers_mutex_.Pointer());
DCHECK(running_workers_.empty());
allow_new_workers_ = true; allow_new_workers_ = true;
externalized_contents_.clear(); externalized_contents_.clear();
} }
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <vector> #include <vector>
#include "src/base/once.h" #include "src/base/once.h"
...@@ -208,9 +207,12 @@ class SerializationDataQueue { ...@@ -208,9 +207,12 @@ class SerializationDataQueue {
class Worker { class Worker {
public: public:
explicit Worker(const char* script); Worker();
~Worker(); ~Worker();
// Run the given script on this Worker. This function should only be called
// once, and should only be called by the thread that created the Worker.
void StartExecuteInThread(const char* script);
// Post a message to the worker's incoming message queue. The worker will // Post a message to the worker's incoming message queue. The worker will
// take ownership of the SerializationData. // take ownership of the SerializationData.
// This function should only be called by the thread that created the Worker. // This function should only be called by the thread that created the Worker.
...@@ -229,20 +231,17 @@ class Worker { ...@@ -229,20 +231,17 @@ class Worker {
// This function can be called by any thread. // This function can be called by any thread.
void WaitForThread(); void WaitForThread();
// Start running the given worker in another thread.
static void StartWorkerThread(std::shared_ptr<Worker> worker);
private: private:
class WorkerThread : public base::Thread { class WorkerThread : public base::Thread {
public: public:
explicit WorkerThread(std::shared_ptr<Worker> worker) explicit WorkerThread(Worker* worker)
: base::Thread(base::Thread::Options("WorkerThread")), : base::Thread(base::Thread::Options("WorkerThread")),
worker_(std::move(worker)) {} worker_(worker) {}
void Run() override; void Run() override { worker_->ExecuteInThread(); }
private: private:
std::shared_ptr<Worker> worker_; Worker* worker_;
}; };
void ExecuteInThread(); void ExecuteInThread();
...@@ -379,6 +378,7 @@ class Shell : public i::AllStatic { ...@@ -379,6 +378,7 @@ class Shell : public i::AllStatic {
Isolate* isolate, Local<Value> value, Local<Value> transfer); Isolate* isolate, Local<Value> value, Local<Value> transfer);
static MaybeLocal<Value> DeserializeValue( static MaybeLocal<Value> DeserializeValue(
Isolate* isolate, std::unique_ptr<SerializationData> data); Isolate* isolate, std::unique_ptr<SerializationData> data);
static void CleanupWorkers();
static int* LookupCounter(const char* name); static int* LookupCounter(const char* name);
static void* CreateHistogram(const char* name, int min, int max, static void* CreateHistogram(const char* name, int min, int max,
size_t buckets); size_t buckets);
...@@ -493,10 +493,6 @@ class Shell : public i::AllStatic { ...@@ -493,10 +493,6 @@ class Shell : public i::AllStatic {
!options.test_shell; !options.test_shell;
} }
static void WaitForRunningWorkers();
static void AddRunningWorker(std::shared_ptr<Worker> worker);
static void RemoveRunningWorker(const std::shared_ptr<Worker>& worker);
private: private:
static Global<Context> evaluation_context_; static Global<Context> evaluation_context_;
static base::OnceType quit_once_; static base::OnceType quit_once_;
...@@ -513,7 +509,7 @@ class Shell : public i::AllStatic { ...@@ -513,7 +509,7 @@ class Shell : public i::AllStatic {
static base::LazyMutex workers_mutex_; // Guards the following members. static base::LazyMutex workers_mutex_; // Guards the following members.
static bool allow_new_workers_; static bool allow_new_workers_;
static std::unordered_set<std::shared_ptr<Worker>> running_workers_; static std::vector<Worker*> workers_;
static std::vector<ExternalizedContents> externalized_contents_; static std::vector<ExternalizedContents> externalized_contents_;
// Multiple isolates may update this flag concurrently. // Multiple isolates may update this flag concurrently.
......
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `postMessage(42)`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Terminate half of the workers early.
for (let i = 0; i < workers.length; i++) {
if ((i & 1) == 1) workers[i].terminate();
}
// Get messages from some workers.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
assertTrue(msg === undefined || msg === 42);
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
gc();
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc
let script = `onmessage =
function(msg) {
if (msg.depth > 0) {
print("spawn");
let w = new Worker(msg.script, {type : "string"});
w.postMessage({script: msg.script, depth: msg.depth - 1});
let m = w.getMessage();
w.terminate();
postMessage(m);
} else {
postMessage(-99);
}
}`;
function RunWorker(depth) {
let w = new Worker(script, {type : "string"});
let array = new Int32Array([55, -77]);
w.postMessage({script: script, depth: depth});
let msg = w.getMessage();
print(msg);
w.terminate();
}
function RunTest(depth, iterations) {
let time = performance.now();
for (let i = 0; i < iterations; i++) {
let now = performance.now();
print(`iteration ${i}, Δ = ${(now - time).toFixed(3)} ms`);
RunWorker(depth);
gc();
time = now;
}
}
// TODO(9524): increase the workload of this test. Runs out of threads
// on too many platforms.
RunTest(1, 1);
RunTest(2, 2);
RunTest(5, 3);
RunTest(9, 2);
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();
...@@ -194,9 +194,6 @@ ...@@ -194,9 +194,6 @@
'wasm/compare-exchange-stress': [PASS, SLOW, NO_VARIANTS], 'wasm/compare-exchange-stress': [PASS, SLOW, NO_VARIANTS],
'wasm/compare-exchange64-stress': [PASS, SLOW, NO_VARIANTS], 'wasm/compare-exchange64-stress': [PASS, SLOW, NO_VARIANTS],
# worker creation/shutdown is very slow in debug mode
'd8/d8-worker-shutdown*': [PASS, ['mode == debug', SLOW]],
# case-insensitive unicode regexp relies on case mapping provided by ICU. # case-insensitive unicode regexp relies on case mapping provided by ICU.
'es6/unicode-regexp-ignore-case': [PASS, ['no_i18n == True', FAIL]], 'es6/unicode-regexp-ignore-case': [PASS, ['no_i18n == True', FAIL]],
'es6/unicode-regexp-ignore-case-noi18n': [FAIL, ['no_i18n == True', PASS]], 'es6/unicode-regexp-ignore-case-noi18n': [FAIL, ['no_i18n == True', PASS]],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment