Commit a74f9eb6 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[wasm]: Use CancelAndDetach and barrier on BackgroundCompileJob.

To avoid keeping around a list of job handles, CancelAndDetach() is
used in CancelCompilation. Dependency on WasmEngine is handled by a
barrier that waits on all jobs to finish.


Change-Id: I685a1737354b2fb3d1f4b98580926a93da38be5b
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2498659
Commit-Queue: Jakob Kummerow <jkummerow@chromium.org>
Reviewed-by: 's avatarUlan Degenbaev <ulan@chromium.org>
Reviewed-by: 's avatarClemens Backes <clemensb@chromium.org>
Reviewed-by: 's avatarJakob Kummerow <jkummerow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#71074}
parent b1d9bbce
......@@ -3307,6 +3307,8 @@ v8_source_set("v8_base_without_compiler") {
"src/strings/uri.h",
"src/tasks/cancelable-task.cc",
"src/tasks/cancelable-task.h",
"src/tasks/operations-barrier.cc",
"src/tasks/operations-barrier.h",
"src/tasks/task-utils.cc",
"src/tasks/task-utils.h",
"src/third_party/siphash/halfsiphash.cc",
......
......@@ -123,7 +123,6 @@ void DefaultJobState::CancelAndWait() {
}
void DefaultJobState::CancelAndDetach() {
base::MutexGuard guard(&mutex_);
is_canceled_.store(true, std::memory_order_relaxed);
}
......
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/tasks/operations-barrier.h"
namespace v8 {
namespace internal {
OperationsBarrier::Token OperationsBarrier::TryLock() {
base::MutexGuard guard(&mutex_);
if (cancelled_) return Token(nullptr);
++operations_count_;
return Token(this);
}
void OperationsBarrier::CancelAndWait() {
base::MutexGuard guard(&mutex_);
DCHECK(!cancelled_);
cancelled_ = true;
while (operations_count_ > 0) {
release_condition_.Wait(&mutex_);
}
}
void OperationsBarrier::Release() {
base::MutexGuard guard(&mutex_);
if (--operations_count_ == 0 && cancelled_) {
release_condition_.NotifyOne();
}
}
} // namespace internal
} // namespace v8
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef V8_TASKS_OPERATIONS_BARRIER_H_
#define V8_TASKS_OPERATIONS_BARRIER_H_
#include <cstdint>
#include "src/base/macros.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
namespace v8 {
namespace internal {
// A thread-safe barrier to manage lifetime of muti-threaded operations.
//
// The barrier is used to determine if operations are allowed, and to keep track
// of how many are currently active. Users will call TryLock() before starting
// such operations. If the call succeeds the user can run the operation and the
// barrier will keep track of it until the user signals that the operation is
// completed. No operations are allowed after CancelAndWait() is called.
//
// There is no explicit way of telling the barrier when an operation is
// completed, instead for convenience TryLock() will return a RAII
// like object that will do so on destruction.
//
// For example:
//
// OperationsBarrier barrier_;
//
// void TearDown() {
// barrier_.CancelAndWait();
// }
//
// void MaybeRunOperation() {
// auto token = barrier_.TryLock();
// if (token)
// Process();
// }
//
class V8_EXPORT_PRIVATE OperationsBarrier {
public:
// The owner of a Token which evaluates to true can safely perform an
// operation while being certain it happens-before CancelAndWait(). Releasing
// this Token relinquishes this right.
//
// This class is thread-safe
class Token {
public:
Token() = default;
~Token() {
if (outer_) outer_->Release();
}
Token(const Token&) = delete;
Token(Token&& other) V8_NOEXCEPT {
this->outer_ = other.outer_;
other.outer_ = nullptr;
}
operator bool() const { return !!outer_; }
private:
friend class OperationsBarrier;
explicit Token(OperationsBarrier* outer) : outer_(outer) {}
OperationsBarrier* outer_ = nullptr;
};
OperationsBarrier() = default;
// Users must call CancelAndWait() before destroying an instance of this
// class.
~OperationsBarrier() { DCHECK(cancelled_); }
OperationsBarrier(const OperationsBarrier&) = delete;
OperationsBarrier& operator=(const OperationsBarrier&) = delete;
// Returns a RAII like object that implicitly converts to true if operations
// are allowed i.e. if this call happens-before CancelAndWait(), otherwise the
// object will convert to false. On successful return, this OperationsBarrier
// will keep track of the operation until the returned object goes out of
// scope.
Token TryLock();
// Prevents further calls to TryLock() from succeeding and waits for
// all the ongoing operations to complete.
//
// Attention: Can only be called once.
void CancelAndWait();
bool cancelled() const { return cancelled_; }
private:
void Release();
// Mutex and condition variable enabling concurrent register and removing, as
// well as waiting for background tasks on {CancelAndWait}.
base::Mutex mutex_;
base::ConditionVariable release_condition_;
bool cancelled_ = false;
size_t operations_count_{0};
};
} // namespace internal
} // namespace v8
#endif // V8_TASKS_OPERATIONS_BARRIER_H_
......@@ -20,6 +20,7 @@
#include "src/logging/counters.h"
#include "src/logging/metrics.h"
#include "src/objects/property-descriptor.h"
#include "src/tasks/operations-barrier.h"
#include "src/tasks/task-utils.h"
#include "src/tracing/trace-event.h"
#include "src/trap-handler/trap-handler.h"
......@@ -1172,7 +1173,7 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
{
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit();
if (!wrapper_unit) return kNoMoreUnits;
......@@ -1184,7 +1185,7 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
++num_processed_wrappers;
bool yield = delegate && delegate->ShouldYield();
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
if (yield ||
!(wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit())) {
......@@ -1215,8 +1216,9 @@ const char* GetCompilationEventName(const WasmCompilationUnit& unit,
// Run by the {BackgroundCompileJob} (on any thread).
CompilationExecutionResult ExecuteCompilationUnits(
std::weak_ptr<NativeModule> native_module, Counters* counters,
JobDelegate* delegate, CompileBaselineOnly baseline_only) {
std::weak_ptr<NativeModule> native_module, WasmEngine* wasm_engine,
Counters* counters, JobDelegate* delegate,
CompileBaselineOnly baseline_only) {
TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits");
// Execute JS to Wasm wrapper units first, so that they are ready to be
......@@ -1232,7 +1234,6 @@ CompilationExecutionResult ExecuteCompilationUnits(
base::Optional<CompilationEnv> env;
std::shared_ptr<WireBytesStorage> wire_bytes;
std::shared_ptr<const WasmModule> module;
WasmEngine* wasm_engine;
// Task 0 is any main thread (there might be multiple from multiple isolates),
// worker threads start at 1 (thus the "+ 1").
int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0;
......@@ -1246,14 +1247,13 @@ CompilationExecutionResult ExecuteCompilationUnits(
// compilation unit.
{
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
auto* compilation_state = compile_scope.compilation_state();
if (compile_scope.cancelled()) return kYield;
env.emplace(compile_scope.native_module()->CreateCompilationEnv());
wire_bytes = compilation_state->GetWireBytesStorage();
wire_bytes = compile_scope.compilation_state()->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module();
wasm_engine = compile_scope.native_module()->engine();
queue = compilation_state->GetQueueForCompileTask(task_id);
unit = compilation_state->GetNextCompilationUnit(queue, baseline_only);
queue = compile_scope.compilation_state()->GetQueueForCompileTask(task_id);
unit = compile_scope.compilation_state()->GetNextCompilationUnit(
queue, baseline_only);
if (!unit) return kNoMoreUnits;
}
TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
......@@ -1273,7 +1273,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// (synchronized): Publish the compilation result and get the next unit.
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (compile_scope.cancelled()) return kYield;
if (!results_to_publish.back().succeeded()) {
compile_scope.compilation_state()->SetError();
......@@ -1574,29 +1574,37 @@ void CompileNativeModule(Isolate* isolate,
class BackgroundCompileJob final : public JobTask {
public:
explicit BackgroundCompileJob(std::weak_ptr<NativeModule> native_module,
WasmEngine* engine,
std::shared_ptr<Counters> async_counters)
: native_module_(std::move(native_module)),
engine_(engine),
engine_barrier_(engine_->GetBarrierForBackgroundCompile()),
async_counters_(std::move(async_counters)) {}
void Run(JobDelegate* delegate) override {
ExecuteCompilationUnits(native_module_, async_counters_.get(), delegate,
kBaselineOrTopTier);
auto engine_scope = engine_barrier_->TryLock();
if (!engine_scope) return;
ExecuteCompilationUnits(native_module_, engine_, async_counters_.get(),
delegate, kBaselineOrTopTier);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
BackgroundCompileScope scope(native_module_);
if (scope.cancelled()) return 0;
BackgroundCompileScope compile_scope(native_module_);
if (compile_scope.cancelled()) return 0;
// NumOutstandingCompilations() does not reflect the units that running
// workers are processing, thus add the current worker count to that number.
size_t flag_limit =
static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
return std::min(
flag_limit,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
worker_count +
compile_scope.compilation_state()->NumOutstandingCompilations());
}
private:
const std::weak_ptr<NativeModule> native_module_;
std::weak_ptr<NativeModule> native_module_;
WasmEngine* engine_;
std::shared_ptr<OperationsBarrier> engine_barrier_;
const std::shared_ptr<Counters> async_counters_;
};
......@@ -2724,11 +2732,15 @@ CompilationStateImpl::CompilationStateImpl(
compilation_unit_queues_(native_module->num_functions()) {}
void CompilationStateImpl::CancelCompilation() {
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |compile_cancelled_|.
compile_cancelled_.store(true, std::memory_order_relaxed);
if (current_compile_job_ && current_compile_job_->IsValid()) {
current_compile_job_->CancelAndDetach();
}
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
callbacks_.clear();
}
......@@ -2930,7 +2942,7 @@ void CompilationStateImpl::AddCompilationUnits(
if (!js_to_wasm_wrapper_units.empty()) {
// |js_to_wasm_wrapper_units_| can only be modified before background
// compilation started.
DCHECK(!current_compile_job_ || !current_compile_job_->IsRunning());
DCHECK(!current_compile_job_ || !current_compile_job_->IsValid());
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end());
......@@ -3230,20 +3242,20 @@ void CompilationStateImpl::SchedulePublishCompilationResults(
}
void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (failed()) return;
if (current_compile_job_ && current_compile_job_->IsValid()) {
current_compile_job_->NotifyConcurrencyIncrease();
return;
}
if (failed()) return;
WasmEngine* engine = native_module_->engine();
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(native_module_weak_,
std::make_unique<BackgroundCompileJob>(native_module_weak_, engine,
async_counters_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
has_priority_ ? TaskPriority::kUserBlocking : TaskPriority::kUserVisible,
std::move(new_compile_job));
native_module_->engine()->ShepherdCompileJobHandle(current_compile_job_);
// Reset the priority. Later uses of the compilation state, e.g. for
// debugging, should compile with the default priority again.
......@@ -3286,8 +3298,8 @@ void CompilationStateImpl::WaitForCompilationEvent(
}
constexpr JobDelegate* kNoDelegate = nullptr;
ExecuteCompilationUnits(native_module_weak_, async_counters_.get(),
kNoDelegate, kBaselineOnly);
ExecuteCompilationUnits(native_module_weak_, native_module_->engine(),
async_counters_.get(), kNoDelegate, kBaselineOnly);
compilation_event_semaphore->Wait();
}
......
......@@ -395,32 +395,7 @@ WasmEngine::~WasmEngine() {
gdb_server_.reset();
#endif // V8_ENABLE_WASM_GDB_REMOTE_DEBUGGING
// Collect the live modules into a vector first, then cancel them while
// releasing our lock. This will allow the background tasks to finish.
std::vector<std::shared_ptr<NativeModule>> live_modules;
{
base::MutexGuard guard(&mutex_);
for (auto& entry : native_modules_) {
if (auto shared_ptr = entry.second->weak_ptr.lock()) {
live_modules.emplace_back(std::move(shared_ptr));
}
}
}
for (auto& native_module : live_modules) {
native_module->compilation_state()->CancelCompilation();
}
live_modules.clear();
// Now wait for all background compile tasks to actually finish.
std::vector<std::shared_ptr<JobHandle>> compile_job_handles;
{
base::MutexGuard guard(&mutex_);
compile_job_handles = compile_job_handles_;
}
for (auto& job_handle : compile_job_handles) {
if (job_handle->IsValid()) job_handle->Cancel();
}
operations_barrier_->CancelAndWait();
// All AsyncCompileJobs have been canceled.
DCHECK(async_compile_jobs_.empty());
......@@ -1333,12 +1308,9 @@ Handle<Script> WasmEngine::GetOrCreateScript(
}
}
void WasmEngine::ShepherdCompileJobHandle(
std::shared_ptr<JobHandle> job_handle) {
DCHECK_NOT_NULL(job_handle);
base::MutexGuard guard(&mutex_);
// TODO(clemensb): Add occasional cleanup of finished handles.
compile_job_handles_.emplace_back(std::move(job_handle));
std::shared_ptr<OperationsBarrier>
WasmEngine::GetBarrierForBackgroundCompile() {
return operations_barrier_;
}
void WasmEngine::TriggerGC(int8_t gc_sequence_index) {
......
......@@ -14,6 +14,7 @@
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
#include "src/tasks/cancelable-task.h"
#include "src/tasks/operations-barrier.h"
#include "src/wasm/wasm-code-manager.h"
#include "src/wasm/wasm-tier.h"
#include "src/zone/accounting-allocator.h"
......@@ -334,9 +335,9 @@ class V8_EXPORT_PRIVATE WasmEngine {
const std::shared_ptr<NativeModule>&,
Vector<const char> source_url = {});
// Take shared ownership of a compile job handle, such that we can synchronize
// on that before the engine dies.
void ShepherdCompileJobHandle(std::shared_ptr<JobHandle>);
// Returns a barrier allowing background compile operations if valid and
// preventing this object from being destroyed.
std::shared_ptr<OperationsBarrier> GetBarrierForBackgroundCompile();
// Call on process start and exit.
static void InitializeOncePerProcess();
......@@ -399,9 +400,8 @@ class V8_EXPORT_PRIVATE WasmEngine {
std::unordered_map<NativeModule*, std::unique_ptr<NativeModuleInfo>>
native_modules_;
// Background compile jobs that are still running. We need to join them before
// the engine gets deleted. Otherwise we don't care when exactly they finish.
std::vector<std::shared_ptr<JobHandle>> compile_job_handles_;
std::shared_ptr<OperationsBarrier> operations_barrier_{
std::make_shared<OperationsBarrier>()};
// Size of code that became dead since the last GC. If this exceeds a certain
// threshold, a new GC is triggered.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment