Commit 14d6d58a authored by Camillo Bruni's avatar Camillo Bruni Committed by V8 LUCI CQ

[d8] Use explicit variable to track Worker state

The explicit state_ variable is used to prevent undefined behaviour
by double-joining the worker thread.

Bug: chromium:1276382
Change-Id: I338cfdb4a587eb57fec5a5a28b42371584c99102
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3318669Reviewed-by: 's avatarMarja Hölttä <marja@chromium.org>
Commit-Queue: Camillo Bruni <cbruni@chromium.org>
Cr-Commit-Position: refs/heads/main@{#78300}
parent 91f08378
......@@ -2550,10 +2550,7 @@ void Shell::WorkerTerminate(const v8::FunctionCallbackInfo<v8::Value>& args) {
HandleScope handle_scope(isolate);
std::shared_ptr<Worker> worker =
GetWorkerFromInternalField(isolate, args.Holder());
if (!worker.get()) {
return;
}
if (!worker.get()) return;
worker->Terminate();
}
......@@ -4000,7 +3997,7 @@ void SerializationDataQueue::Clear() {
}
Worker::Worker(const char* script) : script_(i::StrDup(script)) {
running_.store(false);
state_.store(State::kReady);
}
Worker::~Worker() {
......@@ -4012,8 +4009,11 @@ Worker::~Worker() {
script_ = nullptr;
}
bool Worker::is_running() const { return state_.load() == State::kRunning; }
bool Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
worker->running_.store(true);
auto expected = State::kReady;
CHECK(worker->state_.compare_exchange_strong(expected, State::kRunning));
auto thread = new WorkerThread(worker);
worker->thread_ = thread;
if (thread->Start()) {
......@@ -4054,12 +4054,10 @@ class ProcessMessageTask : public i::CancelableTask {
};
void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
if (!is_running()) return;
// Hold the worker_mutex_ so that the worker thread can't delete task_runner_
// after we've checked running_.
// after we've checked is_running().
base::MutexGuard lock_guard(&worker_mutex_);
if (!running_.load()) {
return;
}
std::unique_ptr<v8::Task> task(new ProcessMessageTask(
task_manager_, shared_from_this(), std::move(data)));
task_runner_->PostNonNestableTask(std::move(task));
......@@ -4072,9 +4070,12 @@ class TerminateTask : public i::CancelableTask {
: i::CancelableTask(task_manager), worker_(worker) {}
void RunInternal() override {
// Make sure the worker doesn't enter the task loop after processing this
// task.
worker_->running_.store(false);
auto expected = Worker::State::kTerminating;
if (!worker_->state_.compare_exchange_strong(expected,
Worker::State::kTerminated)) {
// Thread was joined in the meantime.
CHECK_EQ(worker_->state_.load(), Worker::State::kTerminatingAndJoining);
}
}
private:
......@@ -4086,37 +4087,49 @@ std::unique_ptr<SerializationData> Worker::GetMessage() {
while (!out_queue_.Dequeue(&result)) {
// If the worker is no longer running, and there are no messages in the
// queue, don't expect any more messages from it.
if (!running_.load()) {
break;
}
if (!is_running()) break;
out_semaphore_.Wait();
}
return result;
}
void Worker::TerminateAndWaitForThread() {
Terminate();
// Don't double-join a terminated thread.
auto expected = State::kTerminating;
if (!state_.compare_exchange_strong(expected,
State::kTerminatingAndJoining)) {
expected = State::kTerminated;
if (!state_.compare_exchange_strong(expected,
State::kTerminatingAndJoining)) {
// Avoid double-joining thread.
DCHECK(state_.load() == State::kTerminatingAndJoining ||
state_.load() == State::kTerminatedAndJoined);
return;
}
}
thread_->Join();
expected = State::kTerminatingAndJoining;
CHECK(state_.compare_exchange_strong(expected, State::kTerminatedAndJoined));
}
void Worker::Terminate() {
auto expected = State::kRunning;
if (!state_.compare_exchange_strong(expected, State::kTerminating)) return;
// Hold the worker_mutex_ so that the worker thread can't delete task_runner_
// after we've checked running_.
// after we've checked state_.
base::MutexGuard lock_guard(&worker_mutex_);
if (!running_.load()) {
return;
}
CHECK(state_.load() == State::kTerminating ||
state_.load() == State::kTerminatingAndJoining);
// Post a task to wake up the worker thread.
std::unique_ptr<v8::Task> task(
new TerminateTask(task_manager_, shared_from_this()));
task_runner_->PostTask(std::move(task));
}
void Worker::TerminateAndWaitForThread() {
Terminate();
thread_->Join();
}
void Worker::ProcessMessage(std::unique_ptr<SerializationData> data) {
if (!running_.load()) {
return;
}
if (!is_running()) return;
DCHECK_NOT_NULL(isolate_);
HandleScope scope(isolate_);
Local<Context> context = context_.Get(isolate_);
......@@ -4147,10 +4160,10 @@ void Worker::ProcessMessages() {
i::Isolate* i_isolate = reinterpret_cast<i::Isolate*>(isolate_);
i::SaveAndSwitchContext saved_context(i_isolate, i::Context());
SealHandleScope shs(isolate_);
while (running_.load() && v8::platform::PumpMessageLoop(
g_default_platform, isolate_,
platform::MessageLoopBehavior::kWaitForWork)) {
if (running_.load()) {
while (is_running() && v8::platform::PumpMessageLoop(
g_default_platform, isolate_,
platform::MessageLoopBehavior::kWaitForWork)) {
if (is_running()) {
MicrotasksScope::PerformCheckpoint(isolate_);
}
}
......@@ -4225,10 +4238,12 @@ void Worker::ExecuteInThread() {
}
// TODO(cbruni): Check for unhandled promises here.
{
// Hold the mutex to ensure running_ and task_runner_ change state
// Hold the mutex to ensure task_runner_ changes state
// atomically (see Worker::PostMessage which reads them).
base::MutexGuard lock_guard(&worker_mutex_);
running_.store(false);
// Mark worker as terminated if it's still running.
auto expected = State::kRunning;
state_.compare_exchange_strong(expected, State::kTerminated);
task_runner_.reset();
task_manager_ = nullptr;
}
......
......@@ -209,6 +209,16 @@ class Worker : public std::enable_shared_from_this<Worker> {
friend class ProcessMessageTask;
friend class TerminateTask;
enum class State {
kReady,
kRunning,
kTerminating,
kTerminated,
kTerminatingAndJoining,
kTerminatedAndJoined
};
bool is_running() const;
void ProcessMessage(std::unique_ptr<SerializationData> data);
void ProcessMessages();
......@@ -231,7 +241,7 @@ class Worker : public std::enable_shared_from_this<Worker> {
SerializationDataQueue out_queue_;
base::Thread* thread_ = nullptr;
char* script_;
std::atomic<bool> running_;
std::atomic<State> state_;
// For signalling that the worker has started.
base::Semaphore started_semaphore_{0};
......
......@@ -27,7 +27,11 @@ function RunWorkerBatch(count) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
if ((i & 1) == 1) {
workers[i].terminate();
// A second terminate should have no effect.
workers[i].terminate();
}
}
// Wait for replies.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment