Commit bd18ee7d authored by Camillo Bruni's avatar Camillo Bruni Committed by V8 LUCI CQ

[d8] Fix worker state race conditions

We clear the worker state in the worker thread after processing
all messages (and getting the terminate signal). This could cause
a race condition when interacting with the worker from the main thread.

This was previously working and broke with https://crrev.com/c/3318669

- Add is_joined_ variable which is mutex guarded
- Simplify Worker::State
- Mutex guard task_runner_ access

Bug: v8:12487
Change-Id: Ib53e5a1a636cb29db50efdb63526b0023a5ea768
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3345005Reviewed-by: 's avatarLeszek Swirski <leszeks@chromium.org>
Commit-Queue: Camillo Bruni <cbruni@chromium.org>
Cr-Commit-Position: refs/heads/main@{#78415}
parent 5183aaf2
......@@ -4007,8 +4007,8 @@ Worker::Worker(const char* script) : script_(i::StrDup(script)) {
}
Worker::~Worker() {
CHECK(state_.load() == State::kTerminated);
DCHECK_NULL(isolate_);
delete thread_;
thread_ = nullptr;
delete[] script_;
......@@ -4019,16 +4019,15 @@ bool Worker::is_running() const { return state_.load() == State::kRunning; }
bool Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
auto expected = State::kReady;
CHECK(worker->state_.compare_exchange_strong(expected, State::kRunning));
CHECK(
worker->state_.compare_exchange_strong(expected, State::kPrepareRunning));
auto thread = new WorkerThread(worker);
worker->thread_ = thread;
if (thread->Start()) {
// Wait until the worker is ready to receive messages.
worker->started_semaphore_.Wait();
Shell::AddRunningWorker(std::move(worker));
return true;
}
return false;
if (!thread->Start()) return false;
// Wait until the worker is ready to receive messages.
worker->started_semaphore_.Wait();
Shell::AddRunningWorker(std::move(worker));
return true;
}
void Worker::WorkerThread::Run() {
......@@ -4060,10 +4059,8 @@ class ProcessMessageTask : public i::CancelableTask {
};
void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
if (!is_running()) return;
// Hold the worker_mutex_ so that the worker thread can't delete task_runner_
// after we've checked is_running().
base::MutexGuard lock_guard(&worker_mutex_);
if (!is_running()) return;
std::unique_ptr<v8::Task> task(new ProcessMessageTask(
task_manager_, shared_from_this(), std::move(data)));
task_runner_->PostNonNestableTask(std::move(task));
......@@ -4077,11 +4074,8 @@ class TerminateTask : public i::CancelableTask {
void RunInternal() override {
auto expected = Worker::State::kTerminating;
if (!worker_->state_.compare_exchange_strong(expected,
Worker::State::kTerminated)) {
// Thread was joined in the meantime.
CHECK_EQ(worker_->state_.load(), Worker::State::kTerminatingAndJoining);
}
CHECK(worker_->state_.compare_exchange_strong(expected,
Worker::State::kTerminated));
}
private:
......@@ -4101,34 +4095,19 @@ std::unique_ptr<SerializationData> Worker::GetMessage() {
void Worker::TerminateAndWaitForThread() {
Terminate();
// Don't double-join a terminated thread.
auto expected = State::kTerminating;
if (!state_.compare_exchange_strong(expected,
State::kTerminatingAndJoining)) {
expected = State::kTerminated;
if (!state_.compare_exchange_strong(expected,
State::kTerminatingAndJoining)) {
// Avoid double-joining thread.
DCHECK(state_.load() == State::kTerminatingAndJoining ||
state_.load() == State::kTerminatedAndJoined);
return;
}
{
base::MutexGuard lock_guard(&worker_mutex_);
// Prevent double-joining.
if (is_joined_) return;
is_joined_ = true;
}
thread_->Join();
expected = State::kTerminatingAndJoining;
CHECK(state_.compare_exchange_strong(expected, State::kTerminatedAndJoined));
}
void Worker::Terminate() {
base::MutexGuard lock_guard(&worker_mutex_);
auto expected = State::kRunning;
if (!state_.compare_exchange_strong(expected, State::kTerminating)) return;
// Hold the worker_mutex_ so that the worker thread can't delete task_runner_
// after we've checked state_.
base::MutexGuard lock_guard(&worker_mutex_);
CHECK(state_.load() == State::kTerminating ||
state_.load() == State::kTerminatingAndJoining);
// Post a task to wake up the worker thread.
std::unique_ptr<v8::Task> task(
new TerminateTask(task_manager_, shared_from_this()));
task_runner_->PostTask(std::move(task));
......@@ -4147,9 +4126,7 @@ void Worker::ProcessMessage(std::unique_ptr<SerializationData> data) {
context, String::NewFromUtf8Literal(isolate_, "onmessage",
NewStringType::kInternalized));
Local<Value> onmessage;
if (!maybe_onmessage.ToLocal(&onmessage) || !onmessage->IsFunction()) {
return;
}
if (!maybe_onmessage.ToLocal(&onmessage) || !onmessage->IsFunction()) return;
Local<Function> onmessage_fun = onmessage.As<Function>();
v8::TryCatch try_catch(isolate_);
......@@ -4180,12 +4157,14 @@ void Worker::ExecuteInThread() {
create_params.array_buffer_allocator = Shell::array_buffer_allocator;
create_params.experimental_attach_to_shared_isolate = Shell::shared_isolate;
isolate_ = Isolate::New(create_params);
{
base::MutexGuard lock_guard(&worker_mutex_);
task_runner_ = g_default_platform->GetForegroundTaskRunner(isolate_);
task_manager_ =
reinterpret_cast<i::Isolate*>(isolate_)->cancelable_task_manager();
}
task_runner_ = g_default_platform->GetForegroundTaskRunner(isolate_);
task_manager_ =
reinterpret_cast<i::Isolate*>(isolate_)->cancelable_task_manager();
auto expected = State::kPrepareRunning;
CHECK(state_.compare_exchange_strong(expected, State::kRunning));
// The Worker is now ready to receive messages.
started_semaphore_.Signal();
......@@ -4242,17 +4221,15 @@ void Worker::ExecuteInThread() {
}
Shell::CollectGarbage(isolate_);
}
// TODO(cbruni): Check for unhandled promises here.
{
// Hold the mutex to ensure task_runner_ changes state
// atomically (see Worker::PostMessage which reads them).
base::MutexGuard lock_guard(&worker_mutex_);
// Mark worker as terminated if it's still running.
auto expected = State::kRunning;
state_.compare_exchange_strong(expected, State::kTerminated);
state_.store(State::kTerminated);
CHECK(!is_running());
task_runner_.reset();
task_manager_ = nullptr;
}
context_.Reset();
platform::NotifyIsolateShutdown(g_default_platform, isolate_);
isolate_->Dispose();
......
......@@ -211,11 +211,10 @@ class Worker : public std::enable_shared_from_this<Worker> {
enum class State {
kReady,
kPrepareRunning,
kRunning,
kTerminating,
kTerminated,
kTerminatingAndJoining,
kTerminatedAndJoined
};
bool is_running() const;
......@@ -242,6 +241,7 @@ class Worker : public std::enable_shared_from_this<Worker> {
base::Thread* thread_ = nullptr;
char* script_;
std::atomic<State> state_;
bool is_joined_ = false;
// For signalling that the worker has started.
base::Semaphore started_semaphore_{0};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment