Commit 064ee3c8 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

Reland "[wasm]: Use CancelAndDetach and barrier on BackgroundCompileJob."

Reason for revert: Data race:
https://ci.chromium.org/p/v8/builders/ci/V8%20Linux64%20TSAN/34121

It was assume that MockPlatform runs everything on 1 thread. However,
MockPlatform::PostJob previously would schedule the job through
TestPlatform, which eventually posts concurrent tasks, thus causing
data race.
Fix: Manually calling NewDefaultJobHandle and passing the MockPlatform
ensures the jobs also run sequentially.

Additional change:
- CancelAndDetach is now called in ~CompilationStateImpl() to make sure
it's called in sequence with ScheduleCompileJobForNewUnits

Original CL description:
To avoid keeping around a list of job handles, CancelAndDetach() is
used in CancelCompilation. Dependency on WasmEngine is handled by a
barrier that waits on all jobs to finish.

Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2498659
Commit-Queue: Jakob Kummerow <jkummerow@chromium.org>
Reviewed-by: 's avatarUlan Degenbaev <ulan@chromium.org>
Reviewed-by: 's avatarClemens Backes <clemensb@chromium.org>
Reviewed-by: 's avatarJakob Kummerow <jkummerow@chromium.org>
Cr-Original-Commit-Position: refs/heads/master@{#71074}
Change-Id: Ie9556f7f96f6fb9a61ada0e5cbd58d4fb4a0f571
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2559137
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: 's avatarAndreas Haas <ahaas@chromium.org>
Cr-Commit-Position: refs/heads/master@{#71459}
parent c1b9e690
......@@ -3348,6 +3348,8 @@ v8_source_set("v8_base_without_compiler") {
"src/strings/uri.h",
"src/tasks/cancelable-task.cc",
"src/tasks/cancelable-task.h",
"src/tasks/operations-barrier.cc",
"src/tasks/operations-barrier.h",
"src/tasks/task-utils.cc",
"src/tasks/task-utils.h",
"src/third_party/siphash/halfsiphash.cc",
......
......@@ -123,7 +123,6 @@ void DefaultJobState::CancelAndWait() {
}
void DefaultJobState::CancelAndDetach() {
base::MutexGuard guard(&mutex_);
is_canceled_.store(true, std::memory_order_relaxed);
}
......
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/tasks/operations-barrier.h"
namespace v8 {
namespace internal {
OperationsBarrier::Token OperationsBarrier::TryLock() {
base::MutexGuard guard(&mutex_);
if (cancelled_) return Token(nullptr);
++operations_count_;
return Token(this);
}
void OperationsBarrier::CancelAndWait() {
base::MutexGuard guard(&mutex_);
DCHECK(!cancelled_);
cancelled_ = true;
while (operations_count_ > 0) {
release_condition_.Wait(&mutex_);
}
}
void OperationsBarrier::Release() {
base::MutexGuard guard(&mutex_);
if (--operations_count_ == 0 && cancelled_) {
release_condition_.NotifyOne();
}
}
} // namespace internal
} // namespace v8
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef V8_TASKS_OPERATIONS_BARRIER_H_
#define V8_TASKS_OPERATIONS_BARRIER_H_
#include <cstdint>
#include "src/base/macros.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
namespace v8 {
namespace internal {
// A thread-safe barrier to manage lifetime of muti-threaded operations.
//
// The barrier is used to determine if operations are allowed, and to keep track
// of how many are currently active. Users will call TryLock() before starting
// such operations. If the call succeeds the user can run the operation and the
// barrier will keep track of it until the user signals that the operation is
// completed. No operations are allowed after CancelAndWait() is called.
//
// There is no explicit way of telling the barrier when an operation is
// completed, instead for convenience TryLock() will return a RAII
// like object that will do so on destruction.
//
// For example:
//
// OperationsBarrier barrier_;
//
// void TearDown() {
// barrier_.CancelAndWait();
// }
//
// void MaybeRunOperation() {
// auto token = barrier_.TryLock();
// if (token)
// Process();
// }
//
class V8_EXPORT_PRIVATE OperationsBarrier {
public:
// The owner of a Token which evaluates to true can safely perform an
// operation while being certain it happens-before CancelAndWait(). Releasing
// this Token relinquishes this right.
//
// This class is thread-safe
class Token {
public:
Token() = default;
~Token() {
if (outer_) outer_->Release();
}
Token(const Token&) = delete;
Token(Token&& other) V8_NOEXCEPT {
this->outer_ = other.outer_;
other.outer_ = nullptr;
}
operator bool() const { return !!outer_; }
private:
friend class OperationsBarrier;
explicit Token(OperationsBarrier* outer) : outer_(outer) {}
OperationsBarrier* outer_ = nullptr;
};
OperationsBarrier() = default;
// Users must call CancelAndWait() before destroying an instance of this
// class.
~OperationsBarrier() { DCHECK(cancelled_); }
OperationsBarrier(const OperationsBarrier&) = delete;
OperationsBarrier& operator=(const OperationsBarrier&) = delete;
// Returns a RAII like object that implicitly converts to true if operations
// are allowed i.e. if this call happens-before CancelAndWait(), otherwise the
// object will convert to false. On successful return, this OperationsBarrier
// will keep track of the operation until the returned object goes out of
// scope.
Token TryLock();
// Prevents further calls to TryLock() from succeeding and waits for
// all the ongoing operations to complete.
//
// Attention: Can only be called once.
void CancelAndWait();
bool cancelled() const { return cancelled_; }
private:
void Release();
// Mutex and condition variable enabling concurrent register and removing, as
// well as waiting for background tasks on {CancelAndWait}.
base::Mutex mutex_;
base::ConditionVariable release_condition_;
bool cancelled_ = false;
size_t operations_count_{0};
};
} // namespace internal
} // namespace v8
#endif // V8_TASKS_OPERATIONS_BARRIER_H_
......@@ -20,6 +20,7 @@
#include "src/logging/counters.h"
#include "src/logging/metrics.h"
#include "src/objects/property-descriptor.h"
#include "src/tasks/operations-barrier.h"
#include "src/tasks/task-utils.h"
#include "src/tracing/trace-event.h"
#include "src/trap-handler/trap-handler.h"
......@@ -530,6 +531,13 @@ class CompilationStateImpl {
public:
CompilationStateImpl(const std::shared_ptr<NativeModule>& native_module,
std::shared_ptr<Counters> async_counters);
~CompilationStateImpl() {
// It is safe to access current_compile_job_ without a lock since this is
// the last reference.
if (current_compile_job_ && current_compile_job_->IsValid()) {
current_compile_job_->CancelAndDetach();
}
}
// Cancel all background compilation, without waiting for compile tasks to
// finish.
......@@ -677,7 +685,7 @@ class CompilationStateImpl {
bool has_priority_ = false;
std::shared_ptr<JobHandle> current_compile_job_;
std::unique_ptr<JobHandle> current_compile_job_;
// Features detected to be used in this module. Features can be detected
// as a module is being compiled.
......@@ -1175,7 +1183,7 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
{
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit();
if (!wrapper_unit) return kNoMoreUnits;
......@@ -1187,7 +1195,7 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
++num_processed_wrappers;
bool yield = delegate && delegate->ShouldYield();
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
if (yield ||
!(wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit())) {
......@@ -1218,8 +1226,9 @@ const char* GetCompilationEventName(const WasmCompilationUnit& unit,
// Run by the {BackgroundCompileJob} (on any thread).
CompilationExecutionResult ExecuteCompilationUnits(
std::weak_ptr<NativeModule> native_module, Counters* counters,
JobDelegate* delegate, CompileBaselineOnly baseline_only) {
std::weak_ptr<NativeModule> native_module, WasmEngine* wasm_engine,
Counters* counters, JobDelegate* delegate,
CompileBaselineOnly baseline_only) {
TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits");
// Execute JS to Wasm wrapper units first, so that they are ready to be
......@@ -1235,7 +1244,6 @@ CompilationExecutionResult ExecuteCompilationUnits(
base::Optional<CompilationEnv> env;
std::shared_ptr<WireBytesStorage> wire_bytes;
std::shared_ptr<const WasmModule> module;
WasmEngine* wasm_engine;
// Task 0 is any main thread (there might be multiple from multiple isolates),
// worker threads start at 1 (thus the "+ 1").
int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0;
......@@ -1249,14 +1257,13 @@ CompilationExecutionResult ExecuteCompilationUnits(
// compilation unit.
{
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
auto* compilation_state = compile_scope.compilation_state();
if (compile_scope.cancelled()) return kYield;
env.emplace(compile_scope.native_module()->CreateCompilationEnv());
wire_bytes = compilation_state->GetWireBytesStorage();
wire_bytes = compile_scope.compilation_state()->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module();
wasm_engine = compile_scope.native_module()->engine();
queue = compilation_state->GetQueueForCompileTask(task_id);
unit = compilation_state->GetNextCompilationUnit(queue, baseline_only);
queue = compile_scope.compilation_state()->GetQueueForCompileTask(task_id);
unit = compile_scope.compilation_state()->GetNextCompilationUnit(
queue, baseline_only);
if (!unit) return kNoMoreUnits;
}
TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
......@@ -1276,7 +1283,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// (synchronized): Publish the compilation result and get the next unit.
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
if (!results_to_publish.back().succeeded()) {
compile_scope.compilation_state()->SetError();
......@@ -1574,29 +1581,37 @@ void CompileNativeModule(Isolate* isolate,
class BackgroundCompileJob final : public JobTask {
public:
explicit BackgroundCompileJob(std::weak_ptr<NativeModule> native_module,
WasmEngine* engine,
std::shared_ptr<Counters> async_counters)
: native_module_(std::move(native_module)),
engine_(engine),
engine_barrier_(engine_->GetBarrierForBackgroundCompile()),
async_counters_(std::move(async_counters)) {}
void Run(JobDelegate* delegate) override {
ExecuteCompilationUnits(native_module_, async_counters_.get(), delegate,
kBaselineOrTopTier);
auto engine_scope = engine_barrier_->TryLock();
if (!engine_scope) return;
ExecuteCompilationUnits(native_module_, engine_, async_counters_.get(),
delegate, kBaselineOrTopTier);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
BackgroundCompileScope scope(native_module_);
if (scope.cancelled()) return 0;
BackgroundCompileScope compile_scope(native_module_);
if (compile_scope.cancelled()) return 0;
// NumOutstandingCompilations() does not reflect the units that running
// workers are processing, thus add the current worker count to that number.
size_t flag_limit =
static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
return std::min(
flag_limit,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
worker_count +
compile_scope.compilation_state()->NumOutstandingCompilations());
}
private:
const std::weak_ptr<NativeModule> native_module_;
std::weak_ptr<NativeModule> native_module_;
WasmEngine* engine_;
std::shared_ptr<OperationsBarrier> engine_barrier_;
const std::shared_ptr<Counters> async_counters_;
};
......@@ -2722,11 +2737,12 @@ CompilationStateImpl::CompilationStateImpl(
compilation_unit_queues_(native_module->num_functions()) {}
void CompilationStateImpl::CancelCompilation() {
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |compile_cancelled_|.
compile_cancelled_.store(true, std::memory_order_relaxed);
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
callbacks_.clear();
}
......@@ -3230,32 +3246,28 @@ void CompilationStateImpl::SchedulePublishCompilationResults(
void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (failed()) return;
std::shared_ptr<JobHandle> new_job_handle;
{
base::MutexGuard guard(&mutex_);
if (current_compile_job_ && current_compile_job_->IsValid()) {
current_compile_job_->NotifyConcurrencyIncrease();
if (!current_compile_job_ || !current_compile_job_->IsValid()) {
WasmEngine* engine = native_module_->engine();
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(native_module_weak_, engine,
async_counters_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
has_priority_ ? TaskPriority::kUserBlocking
: TaskPriority::kUserVisible,
std::move(new_compile_job));
// Reset the priority. Later uses of the compilation state, e.g. for
// debugging, should compile with the default priority again.
has_priority_ = false;
return;
}
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(native_module_weak_,
async_counters_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
new_job_handle = V8::GetCurrentPlatform()->PostJob(
has_priority_ ? TaskPriority::kUserBlocking
: TaskPriority::kUserVisible,
std::move(new_compile_job));
current_compile_job_ = new_job_handle;
// Reset the priority. Later uses of the compilation state, e.g. for
// debugging, should compile with the default priority again.
has_priority_ = false;
}
if (new_job_handle) {
native_module_->engine()->ShepherdCompileJobHandle(
std::move(new_job_handle));
}
// Once initialized, |current_compile_job_| is never cleared (except in tests,
// where it's done synchronously).
current_compile_job_->NotifyConcurrencyIncrease();
}
size_t CompilationStateImpl::NumOutstandingCompilations() const {
......@@ -3294,8 +3306,8 @@ void CompilationStateImpl::WaitForCompilationEvent(
}
constexpr JobDelegate* kNoDelegate = nullptr;
ExecuteCompilationUnits(native_module_weak_, async_counters_.get(),
kNoDelegate, kBaselineOnly);
ExecuteCompilationUnits(native_module_weak_, native_module_->engine(),
async_counters_.get(), kNoDelegate, kBaselineOnly);
compilation_event_semaphore->Wait();
}
......
......@@ -396,32 +396,7 @@ WasmEngine::~WasmEngine() {
gdb_server_.reset();
#endif // V8_ENABLE_WASM_GDB_REMOTE_DEBUGGING
// Collect the live modules into a vector first, then cancel them while
// releasing our lock. This will allow the background tasks to finish.
std::vector<std::shared_ptr<NativeModule>> live_modules;
{
base::MutexGuard guard(&mutex_);
for (auto& entry : native_modules_) {
if (auto shared_ptr = entry.second->weak_ptr.lock()) {
live_modules.emplace_back(std::move(shared_ptr));
}
}
}
for (auto& native_module : live_modules) {
native_module->compilation_state()->CancelCompilation();
}
live_modules.clear();
// Now wait for all background compile tasks to actually finish.
std::vector<std::shared_ptr<JobHandle>> compile_job_handles;
{
base::MutexGuard guard(&mutex_);
compile_job_handles = compile_job_handles_;
}
for (auto& job_handle : compile_job_handles) {
if (job_handle->IsValid()) job_handle->Cancel();
}
operations_barrier_->CancelAndWait();
// All AsyncCompileJobs have been canceled.
DCHECK(async_compile_jobs_.empty());
......@@ -1334,12 +1309,9 @@ Handle<Script> WasmEngine::GetOrCreateScript(
}
}
void WasmEngine::ShepherdCompileJobHandle(
std::shared_ptr<JobHandle> job_handle) {
DCHECK_NOT_NULL(job_handle);
base::MutexGuard guard(&mutex_);
// TODO(clemensb): Add occasional cleanup of finished handles.
compile_job_handles_.emplace_back(std::move(job_handle));
std::shared_ptr<OperationsBarrier>
WasmEngine::GetBarrierForBackgroundCompile() {
return operations_barrier_;
}
void WasmEngine::TriggerGC(int8_t gc_sequence_index) {
......
......@@ -14,6 +14,7 @@
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
#include "src/tasks/cancelable-task.h"
#include "src/tasks/operations-barrier.h"
#include "src/wasm/wasm-code-manager.h"
#include "src/wasm/wasm-tier.h"
#include "src/zone/accounting-allocator.h"
......@@ -334,9 +335,9 @@ class V8_EXPORT_PRIVATE WasmEngine {
const std::shared_ptr<NativeModule>&,
Vector<const char> source_url = {});
// Take shared ownership of a compile job handle, such that we can synchronize
// on that before the engine dies.
void ShepherdCompileJobHandle(std::shared_ptr<JobHandle>);
// Returns a barrier allowing background compile operations if valid and
// preventing this object from being destroyed.
std::shared_ptr<OperationsBarrier> GetBarrierForBackgroundCompile();
// Call on process start and exit.
static void InitializeOncePerProcess();
......@@ -399,9 +400,8 @@ class V8_EXPORT_PRIVATE WasmEngine {
std::unordered_map<NativeModule*, std::unique_ptr<NativeModuleInfo>>
native_modules_;
// Background compile jobs that are still running. We need to join them before
// the engine gets deleted. Otherwise we don't care when exactly they finish.
std::vector<std::shared_ptr<JobHandle>> compile_job_handles_;
std::shared_ptr<OperationsBarrier> operations_barrier_{
std::make_shared<OperationsBarrier>()};
// Size of code that became dead since the last GC. If this exceeds a certain
// threshold, a new GC is triggered.
......
......@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "include/libplatform/libplatform.h"
#include "src/api/api-inl.h"
#include "src/init/v8.h"
#include "src/objects/managed.h"
......@@ -38,7 +39,8 @@ class MockPlatform final : public TestPlatform {
std::unique_ptr<v8::JobHandle> PostJob(
v8::TaskPriority priority,
std::unique_ptr<v8::JobTask> job_task) override {
auto orig_job_handle = TestPlatform::PostJob(priority, std::move(job_task));
auto orig_job_handle = v8::platform::NewDefaultJobHandle(
this, priority, std::move(job_task), 1);
auto job_handle =
std::make_unique<MockJobHandle>(std::move(orig_job_handle), this);
job_handles_.insert(job_handle.get());
......
......@@ -4,6 +4,7 @@
#include <memory>
#include "include/libplatform/libplatform.h"
#include "include/v8-metrics.h"
#include "src/api/api-inl.h"
#include "src/wasm/wasm-module-builder.h"
......@@ -33,7 +34,8 @@ class MockPlatform final : public TestPlatform {
std::unique_ptr<v8::JobHandle> PostJob(
v8::TaskPriority priority,
std::unique_ptr<v8::JobTask> job_task) override {
auto orig_job_handle = TestPlatform::PostJob(priority, std::move(job_task));
auto orig_job_handle = v8::platform::NewDefaultJobHandle(
this, priority, std::move(job_task), 1);
auto job_handle =
std::make_unique<MockJobHandle>(std::move(orig_job_handle), this);
job_handles_.insert(job_handle.get());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment