Commit 5fd99133 authored by Shu-yu Guo's avatar Shu-yu Guo Committed by V8 LUCI CQ

[d8] Fixing thread parking in d8

Add ParkedScopes in d8 where it blocks.

Change-Id: I369fbdb361b4e357ff6ceef53fbf52f543979438
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3704903Reviewed-by: 's avatarCamillo Bruni <cbruni@chromium.org>
Commit-Queue: Michael Lippautz <mlippautz@chromium.org>
Auto-Submit: Shu-yu Guo <syg@chromium.org>
Commit-Queue: Shu-yu Guo <syg@chromium.org>
Reviewed-by: 's avatarMichael Lippautz <mlippautz@chromium.org>
Cr-Commit-Position: refs/heads/main@{#81314}
parent c441e756
......@@ -2682,7 +2682,7 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
i::Handle<i::Object> managed = i::Managed<Worker>::FromSharedPtr(
i_isolate, kWorkerSizeEstimate, worker);
args.Holder()->SetInternalField(0, Utils::ToLocal(managed));
if (!Worker::StartWorkerThread(std::move(worker))) {
if (!Worker::StartWorkerThread(isolate, std::move(worker))) {
isolate->ThrowError("Can't start thread");
return;
}
......@@ -2723,7 +2723,7 @@ void Shell::WorkerGetMessage(const v8::FunctionCallbackInfo<v8::Value>& args) {
return;
}
std::unique_ptr<SerializationData> data = worker->GetMessage();
std::unique_ptr<SerializationData> data = worker->GetMessage(isolate);
if (data) {
Local<Value> value;
if (Shell::DeserializeValue(isolate, std::move(data)).ToLocal(&value)) {
......@@ -2751,7 +2751,9 @@ void Shell::WorkerTerminateAndWait(
return;
}
worker->TerminateAndWaitForThread();
i::ParkedScope parked(
reinterpret_cast<i::Isolate*>(isolate)->main_thread_local_isolate());
worker->TerminateAndWaitForThread(parked);
}
void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) {
......@@ -4181,9 +4183,8 @@ void SourceGroup::ExecuteInThread() {
for (int i = 0; i < Shell::options.stress_runs; ++i) {
{
i::ParkedScope parked_scope(
next_semaphore_.ParkedWait(
reinterpret_cast<i::Isolate*>(isolate)->main_thread_local_isolate());
next_semaphore_.Wait();
}
{
Isolate::Scope iscope(isolate);
......@@ -4216,12 +4217,13 @@ void SourceGroup::StartExecuteInThread() {
next_semaphore_.Signal();
}
void SourceGroup::WaitForThread() {
void SourceGroup::WaitForThread(const i::ParkedScope& parked) {
if (thread_ == nullptr) return;
done_semaphore_.Wait();
done_semaphore_.ParkedWait(parked);
}
void SourceGroup::JoinThread() {
void SourceGroup::JoinThread(const i::ParkedScope& parked) {
USE(parked);
if (thread_ == nullptr) return;
thread_->Join();
}
......@@ -4266,7 +4268,8 @@ Worker::~Worker() {
bool Worker::is_running() const { return state_.load() == State::kRunning; }
bool Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
bool Worker::StartWorkerThread(Isolate* requester,
std::shared_ptr<Worker> worker) {
auto expected = State::kReady;
CHECK(
worker->state_.compare_exchange_strong(expected, State::kPrepareRunning));
......@@ -4274,7 +4277,8 @@ bool Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
worker->thread_ = thread;
if (!thread->Start()) return false;
// Wait until the worker is ready to receive messages.
worker->started_semaphore_.Wait();
worker->started_semaphore_.ParkedWait(
reinterpret_cast<i::Isolate*>(requester)->main_thread_local_isolate());
Shell::AddRunningWorker(std::move(worker));
return true;
}
......@@ -4331,18 +4335,20 @@ class TerminateTask : public i::CancelableTask {
std::shared_ptr<Worker> worker_;
};
std::unique_ptr<SerializationData> Worker::GetMessage() {
std::unique_ptr<SerializationData> Worker::GetMessage(Isolate* requester) {
std::unique_ptr<SerializationData> result;
while (!out_queue_.Dequeue(&result)) {
// If the worker is no longer running, and there are no messages in the
// queue, don't expect any more messages from it.
if (!is_running()) break;
out_semaphore_.Wait();
out_semaphore_.ParkedWait(
reinterpret_cast<i::Isolate*>(requester)->main_thread_local_isolate());
}
return result;
}
void Worker::TerminateAndWaitForThread() {
void Worker::TerminateAndWaitForThread(const i::ParkedScope& parked) {
USE(parked);
Terminate();
{
base::MutexGuard lock_guard(&worker_mutex_);
......@@ -4901,12 +4907,12 @@ int Shell::RunMain(Isolate* isolate, bool last_run) {
for (int i = 1; i < options.num_isolates; ++i) {
if (last_run) {
options.isolate_sources[i].JoinThread();
options.isolate_sources[i].JoinThread(parked);
} else {
options.isolate_sources[i].WaitForThread();
options.isolate_sources[i].WaitForThread(parked);
}
}
WaitForRunningWorkers();
WaitForRunningWorkers(parked);
if (Shell::unhandled_promise_rejections_.load() > 0) {
printf("%i pending unhandled Promise rejection(s) detected.\n",
Shell::unhandled_promise_rejections_.load());
......@@ -5392,7 +5398,7 @@ void Shell::RemoveRunningWorker(const std::shared_ptr<Worker>& worker) {
if (it != running_workers_.end()) running_workers_.erase(it);
}
void Shell::WaitForRunningWorkers() {
void Shell::WaitForRunningWorkers(const i::ParkedScope& parked) {
// Make a copy of running_workers_, because we don't want to call
// Worker::Terminate while holding the workers_mutex_ lock. Otherwise, if a
// worker is about to create a new Worker, it would deadlock.
......@@ -5404,7 +5410,7 @@ void Shell::WaitForRunningWorkers() {
}
for (auto& worker : workers_copy) {
worker->TerminateAndWaitForThread();
worker->TerminateAndWaitForThread(parked);
}
// Now that all workers are terminated, we can re-enable Worker creation.
......
......@@ -21,6 +21,7 @@
#include "src/base/platform/time.h"
#include "src/base/platform/wrappers.h"
#include "src/d8/async-hooks-wrapper.h"
#include "src/heap/parked-scope.h"
#include "src/strings/string-hasher.h"
#include "src/utils/allocation.h"
#include "src/utils/utils.h"
......@@ -107,8 +108,8 @@ class SourceGroup {
bool Execute(Isolate* isolate);
void StartExecuteInThread();
void WaitForThread();
void JoinThread();
void WaitForThread(const i::ParkedScope& parked);
void JoinThread(const i::ParkedScope& parked);
private:
class IsolateThread : public base::Thread {
......@@ -123,8 +124,8 @@ class SourceGroup {
void ExecuteInThread();
base::Semaphore next_semaphore_;
base::Semaphore done_semaphore_;
i::ParkingSemaphore next_semaphore_;
i::ParkingSemaphore done_semaphore_;
base::Thread* thread_;
void ExitShell(int exit_code);
......@@ -199,17 +200,18 @@ class Worker : public std::enable_shared_from_this<Worker> {
// If there are no messages in the queue and the worker is no longer running,
// return nullptr.
// This function should only be called by the thread that created the Worker.
std::unique_ptr<SerializationData> GetMessage();
std::unique_ptr<SerializationData> GetMessage(Isolate* requester);
// Terminate the worker's event loop. Messages from the worker that have been
// queued can still be read via GetMessage().
// This function can be called by any thread.
void Terminate();
// Terminate and join the thread.
// This function can be called by any thread.
void TerminateAndWaitForThread();
void TerminateAndWaitForThread(const i::ParkedScope& parked);
// Start running the given worker in another thread.
static bool StartWorkerThread(std::shared_ptr<Worker> worker);
static bool StartWorkerThread(Isolate* requester,
std::shared_ptr<Worker> worker);
private:
friend class ProcessMessageTask;
......@@ -242,14 +244,14 @@ class Worker : public std::enable_shared_from_this<Worker> {
void ExecuteInThread();
static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args);
base::Semaphore out_semaphore_{0};
i::ParkingSemaphore out_semaphore_{0};
SerializationDataQueue out_queue_;
base::Thread* thread_ = nullptr;
char* script_;
std::atomic<State> state_;
bool is_joined_ = false;
// For signalling that the worker has started.
base::Semaphore started_semaphore_{0};
i::ParkingSemaphore started_semaphore_{0};
// For posting tasks to the worker
std::shared_ptr<TaskRunner> task_runner_;
......@@ -701,7 +703,7 @@ class Shell : public i::AllStatic {
}
static bool is_valid_fuzz_script() { return valid_fuzz_script_.load(); }
static void WaitForRunningWorkers();
static void WaitForRunningWorkers(const i::ParkedScope& parked);
static void AddRunningWorker(std::shared_ptr<Worker> worker);
static void RemoveRunningWorker(const std::shared_ptr<Worker>& worker);
......
......@@ -127,6 +127,10 @@ class V8_NODISCARD ParkingConditionVariable final
}
void ParkedWait(LocalHeap* local_heap, base::Mutex* mutex) {
ParkedScope scope(local_heap);
ParkedWait(scope, mutex);
}
void ParkedWait(const ParkedScope& scope, base::Mutex* mutex) {
USE(scope);
Wait(mutex);
}
......@@ -137,6 +141,11 @@ class V8_NODISCARD ParkingConditionVariable final
bool ParkedWaitFor(LocalHeap* local_heap, base::Mutex* mutex,
const base::TimeDelta& rel_time) V8_WARN_UNUSED_RESULT {
ParkedScope scope(local_heap);
return ParkedWaitFor(scope, mutex, rel_time);
}
bool ParkedWaitFor(const ParkedScope& scope, base::Mutex* mutex,
const base::TimeDelta& rel_time) V8_WARN_UNUSED_RESULT {
USE(scope);
return WaitFor(mutex, rel_time);
}
......@@ -158,6 +167,10 @@ class V8_NODISCARD ParkingSemaphore final : public base::Semaphore {
}
void ParkedWait(LocalHeap* local_heap) {
ParkedScope scope(local_heap);
ParkedWait(scope);
}
void ParkedWait(const ParkedScope& scope) {
USE(scope);
Wait();
}
......@@ -168,6 +181,11 @@ class V8_NODISCARD ParkingSemaphore final : public base::Semaphore {
bool ParkedWaitFor(LocalHeap* local_heap,
const base::TimeDelta& rel_time) V8_WARN_UNUSED_RESULT {
ParkedScope scope(local_heap);
return ParkedWaitFor(scope, rel_time);
}
bool ParkedWaitFor(const ParkedScope& scope,
const base::TimeDelta& rel_time) {
USE(scope);
return WaitFor(rel_time);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment