Commit e6b2d673 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[wasm] Avoid lock in BackgroundCompileToken

Most code protected by compilation_scope_mutex_ is already either thread
safe, or could run in parallel. Removing lock reduces contention.
Note that weak_ptr::lock is atomic and thus still prevents deletion
of NativeModule&CompilationStateImpl for the scope of
BackgroundCompileScope.
Related changes:
- BackgroundCompileToken is deleted and publish_queue is moved to
  CompilationStateImpl.
- Some of the (non thread-safe) logic in publish_results is moved into
  PublishCompilationResults so that it is serialized to 1 thread
  running publisher.
- cancellation is handled by an atomic bool and is no longer
  synchronized. This means that compilation may be cancelled while
  a worker thread is still running. That thread would only
  stop once it reaches a new BackgroundCompileScope.

Change-Id: I9651e924857c583d1a0fe5b9ffa99bfd01a8bda4
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2442192Reviewed-by: 's avatarClemens Backes <clemensb@chromium.org>
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70574}
parent 0403beb4
......@@ -79,105 +79,24 @@ enum class CompileStrategy : uint8_t {
kDefault = kEager,
};
// Background compile jobs hold a shared pointer to this token. The token is
// used to notify them that they should stop. As soon as they see this (after
// finishing their current compilation unit), they will stop.
// This allows to already remove the NativeModule without having to synchronize
// on background compile jobs.
class BackgroundCompileToken {
public:
explicit BackgroundCompileToken(
const std::shared_ptr<NativeModule>& native_module)
: native_module_(native_module) {}
void Cancel() {
base::SharedMutexGuard<base::kExclusive> mutex_guard(
&compilation_scope_mutex_);
native_module_.reset();
}
private:
friend class BackgroundCompileScope;
std::shared_ptr<NativeModule> StartScope() {
compilation_scope_mutex_.LockShared();
return native_module_.lock();
}
// This private method can only be called via {BackgroundCompileScope}.
void SchedulePublishCode(NativeModule* native_module,
std::vector<std::unique_ptr<WasmCode>> codes) {
{
base::MutexGuard guard(&publish_mutex_);
if (publisher_running_) {
// Add new code to the queue and return.
publish_queue_.reserve(publish_queue_.size() + codes.size());
for (auto& c : codes) publish_queue_.emplace_back(std::move(c));
return;
}
publisher_running_ = true;
}
while (true) {
PublishCode(native_module, VectorOf(codes));
codes.clear();
// Keep publishing new code that came in.
base::MutexGuard guard(&publish_mutex_);
DCHECK(publisher_running_);
if (publish_queue_.empty()) {
publisher_running_ = false;
return;
}
codes.swap(publish_queue_);
}
}
void PublishCode(NativeModule*, Vector<std::unique_ptr<WasmCode>>);
void ExitScope() { compilation_scope_mutex_.UnlockShared(); }
// {compilation_scope_mutex_} protects {native_module_}.
base::SharedMutex compilation_scope_mutex_;
std::weak_ptr<NativeModule> native_module_;
// {publish_mutex_} protects {publish_queue_} and {publisher_running_}.
base::Mutex publish_mutex_;
std::vector<std::unique_ptr<WasmCode>> publish_queue_;
bool publisher_running_ = false;
};
class CompilationStateImpl;
// Keep these scopes short, as they hold the mutex of the token, which
// sequentializes all these scopes. The mutex is also acquired from foreground
// tasks, which should not be blocked for a long time.
class BackgroundCompileScope {
public:
explicit BackgroundCompileScope(
const std::shared_ptr<BackgroundCompileToken>& token)
: token_(token.get()), native_module_(token->StartScope()) {}
~BackgroundCompileScope() { token_->ExitScope(); }
explicit BackgroundCompileScope(std::weak_ptr<NativeModule> native_module)
: native_module_(native_module.lock()) {}
bool cancelled() const { return native_module_ == nullptr; }
NativeModule* native_module() {
DCHECK(!cancelled());
NativeModule* native_module() const {
DCHECK(native_module_);
return native_module_.get();
}
inline CompilationStateImpl* compilation_state() const;
inline CompilationStateImpl* compilation_state();
// Call {SchedulePublishCode} via the {BackgroundCompileScope} to guarantee
// that the {NativeModule} stays alive.
void SchedulePublishCode(std::vector<std::unique_ptr<WasmCode>> codes) {
token_->SchedulePublishCode(native_module_.get(), std::move(codes));
}
bool cancelled() const;
private:
BackgroundCompileToken* const token_;
// Keep the native module alive while in this scope.
std::shared_ptr<NativeModule> const native_module_;
std::shared_ptr<NativeModule> native_module_;
};
enum CompileBaselineOnly : bool {
......@@ -577,6 +496,7 @@ class CompilationStateImpl {
// Cancel all background compilation, without waiting for compile tasks to
// finish.
void CancelCompilation();
bool cancelled() const;
// Initialize compilation progress. Set compilation tiers to expect for
// baseline and top tier compilation. Must be set before {AddCompilationUnits}
......@@ -626,6 +546,8 @@ class CompilationStateImpl {
void OnCompilationStopped(const WasmFeatures& detected);
void PublishDetectedFeatures(Isolate*);
void SchedulePublishCompilationResults(
std::vector<std::unique_ptr<WasmCode>> unpublished_code);
// Ensure that a compilation job is running, and increase its concurrency if
// needed.
void ScheduleCompileJobForNewUnits();
......@@ -678,8 +600,12 @@ class CompilationStateImpl {
// Hold the {callbacks_mutex_} when calling this method.
void TriggerCallbacks(base::EnumSet<CompilationEvent> additional_events = {});
void PublishCompilationResults(
std::vector<std::unique_ptr<WasmCode>> unpublished_code);
void PublishCode(Vector<std::unique_ptr<WasmCode>> codes);
NativeModule* const native_module_;
const std::shared_ptr<BackgroundCompileToken> background_compile_token_;
std::weak_ptr<NativeModule> const native_module_weak_;
const CompileMode compile_mode_;
const std::shared_ptr<Counters> async_counters_;
......@@ -687,6 +613,10 @@ class CompilationStateImpl {
// using relaxed semantics.
std::atomic<bool> compile_failed_{false};
// True if compilation was cancelled and worker threads should return. This
// flag can be updated and read using relaxed semantics.
std::atomic<bool> compile_cancelled_{false};
CompilationUnitQueues compilation_unit_queues_;
// Index of the next wrapper to compile in {js_to_wasm_wrapper_units_}.
......@@ -742,6 +672,11 @@ class CompilationStateImpl {
// End of fields protected by {callbacks_mutex_}.
//////////////////////////////////////////////////////////////////////////////
// {publish_mutex_} protects {publish_queue_} and {publisher_running_}.
base::Mutex publish_mutex_;
std::vector<std::unique_ptr<WasmCode>> publish_queue_;
bool publisher_running_ = false;
// Encoding of fields in the {compilation_progress_} vector.
using RequiredBaselineTierField = base::BitField8<ExecutionTier, 0, 2>;
using RequiredTopTierField = base::BitField8<ExecutionTier, 2, 2>;
......@@ -756,21 +691,14 @@ const CompilationStateImpl* Impl(const CompilationState* compilation_state) {
return reinterpret_cast<const CompilationStateImpl*>(compilation_state);
}
CompilationStateImpl* BackgroundCompileScope::compilation_state() {
return Impl(native_module()->compilation_state());
CompilationStateImpl* BackgroundCompileScope::compilation_state() const {
DCHECK(native_module_);
return Impl(native_module_->compilation_state());
}
void BackgroundCompileToken::PublishCode(
NativeModule* native_module, Vector<std::unique_ptr<WasmCode>> code) {
WasmCodeRefScope code_ref_scope;
std::vector<WasmCode*> published_code = native_module->PublishCode(code);
// Defer logging code in case wire bytes were not fully received yet.
if (native_module->HasWireBytes()) {
native_module->engine()->LogCode(VectorOf(published_code));
}
Impl(native_module->compilation_state())
->OnFinishedUnits(VectorOf(published_code));
bool BackgroundCompileScope::cancelled() const {
return native_module_ == nullptr ||
Impl(native_module_->compilation_state())->cancelled();
}
void UpdateFeatureUseCounts(Isolate* isolate, const WasmFeatures& detected) {
......@@ -849,8 +777,9 @@ bool CompilationState::recompilation_finished() const {
std::unique_ptr<CompilationState> CompilationState::New(
const std::shared_ptr<NativeModule>& native_module,
std::shared_ptr<Counters> async_counters) {
return std::unique_ptr<CompilationState>(reinterpret_cast<CompilationState*>(
new CompilationStateImpl(native_module, std::move(async_counters))));
return std::unique_ptr<CompilationState>(
reinterpret_cast<CompilationState*>(new CompilationStateImpl(
std::move(native_module), std::move(async_counters))));
}
// End of PIMPL implementation of {CompilationState}.
......@@ -1194,13 +1123,12 @@ void RecordStats(const Code code, Counters* counters) {
enum CompilationExecutionResult : int8_t { kNoMoreUnits, kYield };
CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
const std::shared_ptr<BackgroundCompileToken>& token,
JobDelegate* delegate) {
std::weak_ptr<NativeModule> native_module, JobDelegate* delegate) {
std::shared_ptr<JSToWasmWrapperCompilationUnit> wrapper_unit = nullptr;
int num_processed_wrappers = 0;
{
BackgroundCompileScope compile_scope(token);
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit();
......@@ -1211,7 +1139,7 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
wrapper_unit->Execute();
++num_processed_wrappers;
bool yield = delegate && delegate->ShouldYield();
BackgroundCompileScope compile_scope(token);
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (yield ||
!(wrapper_unit = compile_scope.compilation_state()
......@@ -1225,14 +1153,15 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
// Run by the {BackgroundCompileJob} (on any thread).
CompilationExecutionResult ExecuteCompilationUnits(
const std::shared_ptr<BackgroundCompileToken>& token, Counters* counters,
std::weak_ptr<NativeModule> native_module, Counters* counters,
JobDelegate* delegate, CompileBaselineOnly baseline_only) {
TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits");
// Execute JS to Wasm wrapper units first, so that they are ready to be
// finalized by the main thread when the kFinishedBaselineCompilation event is
// triggered.
if (ExecuteJSToWasmWrapperCompilationUnits(token, delegate) == kYield) {
if (ExecuteJSToWasmWrapperCompilationUnits(native_module, delegate) ==
kYield) {
return kYield;
}
......@@ -1252,7 +1181,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// Preparation (synchronized): Initialize the fields above and get the first
// compilation unit.
{
BackgroundCompileScope compile_scope(token);
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
auto* compilation_state = compile_scope.compilation_state();
env.emplace(compile_scope.native_module()->CreateCompilationEnv());
......@@ -1267,44 +1196,6 @@ CompilationExecutionResult ExecuteCompilationUnits(
TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
std::vector<WasmCompilationResult> results_to_publish;
auto publish_results = [&results_to_publish](
BackgroundCompileScope* compile_scope) {
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"),
"wasm.PublishCompilationResults", "num_results",
results_to_publish.size());
if (results_to_publish.empty()) return;
std::vector<std::unique_ptr<WasmCode>> unpublished_code =
compile_scope->native_module()->AddCompiledCode(
VectorOf(results_to_publish));
results_to_publish.clear();
// For import wrapper compilation units, add result to the cache.
const NativeModule* native_module = compile_scope->native_module();
int num_imported_functions = native_module->num_imported_functions();
WasmImportWrapperCache* cache = native_module->import_wrapper_cache();
for (const auto& code : unpublished_code) {
int func_index = code->index();
DCHECK_LE(0, func_index);
DCHECK_LT(func_index, native_module->num_functions());
if (func_index < num_imported_functions) {
const FunctionSig* sig =
native_module->module()->functions[func_index].sig;
WasmImportWrapperCache::CacheKey key(
compiler::kDefaultImportCallKind, sig,
static_cast<int>(sig->parameter_count()));
// If two imported functions have the same key, only one of them should
// have been added as a compilation unit. So it is always the first time
// we compile a wrapper for this key here.
DCHECK_NULL((*cache)[key]);
(*cache)[key] = code.get();
code->IncRef();
}
}
compile_scope->SchedulePublishCode(std::move(unpublished_code));
};
bool compilation_failed = false;
while (true) {
// (asynchronous): Execute the compilation.
......@@ -1316,7 +1207,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// (synchronized): Publish the compilation result and get the next unit.
{
BackgroundCompileScope compile_scope(token);
BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (!results_to_publish.back().succeeded()) {
......@@ -1329,7 +1220,12 @@ CompilationExecutionResult ExecuteCompilationUnits(
if (yield ||
!(unit = compile_scope.compilation_state()->GetNextCompilationUnit(
queue, baseline_only))) {
publish_results(&compile_scope);
std::vector<std::unique_ptr<WasmCode>> unpublished_code =
compile_scope.native_module()->AddCompiledCode(
VectorOf(std::move(results_to_publish)));
results_to_publish.clear();
compile_scope.compilation_state()->SchedulePublishCompilationResults(
std::move(unpublished_code));
compile_scope.compilation_state()->OnCompilationStopped(
detected_features);
return yield ? kYield : kNoMoreUnits;
......@@ -1344,14 +1240,18 @@ CompilationExecutionResult ExecuteCompilationUnits(
if (unit->tier() == ExecutionTier::kTurbofan ||
static_cast<int>(results_to_publish.size()) >=
unpublished_units_limit) {
publish_results(&compile_scope);
std::vector<std::unique_ptr<WasmCode>> unpublished_code =
compile_scope.native_module()->AddCompiledCode(
VectorOf(std::move(results_to_publish)));
results_to_publish.clear();
compile_scope.compilation_state()->SchedulePublishCompilationResults(
std::move(unpublished_code));
}
}
}
// We only get here if compilation failed. Other exits return directly.
DCHECK(compilation_failed);
USE(compilation_failed);
token->Cancel();
return kNoMoreUnits;
}
......@@ -1607,20 +1507,20 @@ void CompileNativeModule(Isolate* isolate,
}
}
// The runnable task that performs compilations in the background.
class BackgroundCompileJob : public JobTask {
public:
explicit BackgroundCompileJob(std::shared_ptr<BackgroundCompileToken> token,
explicit BackgroundCompileJob(std::weak_ptr<NativeModule> native_module,
std::shared_ptr<Counters> async_counters)
: token_(std::move(token)), async_counters_(std::move(async_counters)) {}
: native_module_(std::move(native_module)),
async_counters_(std::move(async_counters)) {}
void Run(JobDelegate* delegate) override {
ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
ExecuteCompilationUnits(native_module_, async_counters_.get(), delegate,
kBaselineOrTopTier);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
BackgroundCompileScope scope(token_);
BackgroundCompileScope scope(native_module_);
if (scope.cancelled()) return 0;
// NumOutstandingCompilations() does not reflect the units that running
// workers are processing, thus add the current worker count to that number.
......@@ -1632,7 +1532,7 @@ class BackgroundCompileJob : public JobTask {
}
private:
const std::shared_ptr<BackgroundCompileToken> token_;
const std::weak_ptr<NativeModule> native_module_;
const std::shared_ptr<Counters> async_counters_;
};
......@@ -2751,8 +2651,7 @@ CompilationStateImpl::CompilationStateImpl(
const std::shared_ptr<NativeModule>& native_module,
std::shared_ptr<Counters> async_counters)
: native_module_(native_module.get()),
background_compile_token_(
std::make_shared<BackgroundCompileToken>(native_module)),
native_module_weak_(std::move(native_module)),
compile_mode_(FLAG_wasm_tier_up &&
native_module->module()->origin == kWasmOrigin
? CompileMode::kTiering
......@@ -2761,12 +2660,18 @@ CompilationStateImpl::CompilationStateImpl(
compilation_unit_queues_(native_module->num_functions()) {}
void CompilationStateImpl::CancelCompilation() {
background_compile_token_->Cancel();
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |compile_cancelled_|.
compile_cancelled_.store(true, std::memory_order_relaxed);
callbacks_.clear();
}
bool CompilationStateImpl::cancelled() const {
return compile_cancelled_.load(std::memory_order_relaxed);
}
void CompilationStateImpl::InitializeCompilationProgress(
bool lazy_module, int num_import_wrappers, int num_export_wrappers) {
DCHECK(!failed());
......@@ -2955,9 +2860,14 @@ void CompilationStateImpl::AddCompilationUnits(
compilation_unit_queues_.AddUnits(baseline_units, top_tier_units,
native_module_->module());
}
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end());
if (!js_to_wasm_wrapper_units.empty()) {
// |js_to_wasm_wrapper_units_| can only be modified before background
// compilation started.
DCHECK(!current_compile_job_ || !current_compile_job_->IsRunning());
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end());
}
ScheduleCompileJobForNewUnits();
}
......@@ -3200,6 +3110,78 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) {
UpdateFeatureUseCounts(isolate, detected_features_);
}
void CompilationStateImpl::PublishCompilationResults(
std::vector<std::unique_ptr<WasmCode>> unpublished_code) {
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"),
"wasm.PublishCompilationResults", "num_results",
unpublished_code.size());
if (unpublished_code.empty()) return;
// For import wrapper compilation units, add result to the cache.
int num_imported_functions = native_module_->num_imported_functions();
WasmImportWrapperCache* cache = native_module_->import_wrapper_cache();
for (const auto& code : unpublished_code) {
int func_index = code->index();
DCHECK_LE(0, func_index);
DCHECK_LT(func_index, native_module_->num_functions());
if (func_index < num_imported_functions) {
const FunctionSig* sig =
native_module_->module()->functions[func_index].sig;
WasmImportWrapperCache::CacheKey key(
compiler::kDefaultImportCallKind, sig,
static_cast<int>(sig->parameter_count()));
// If two imported functions have the same key, only one of them should
// have been added as a compilation unit. So it is always the first time
// we compile a wrapper for this key here.
DCHECK_NULL((*cache)[key]);
(*cache)[key] = code.get();
code->IncRef();
}
}
PublishCode(VectorOf(unpublished_code));
}
void CompilationStateImpl::PublishCode(Vector<std::unique_ptr<WasmCode>> code) {
WasmCodeRefScope code_ref_scope;
std::vector<WasmCode*> published_code =
native_module_->PublishCode(std::move(code));
// Defer logging code in case wire bytes were not fully received yet.
if (native_module_->HasWireBytes()) {
native_module_->engine()->LogCode(VectorOf(published_code));
}
OnFinishedUnits(VectorOf(std::move(published_code)));
}
void CompilationStateImpl::SchedulePublishCompilationResults(
std::vector<std::unique_ptr<WasmCode>> unpublished_code) {
{
base::MutexGuard guard(&publish_mutex_);
if (publisher_running_) {
// Add new code to the queue and return.
publish_queue_.reserve(publish_queue_.size() + unpublished_code.size());
for (auto& c : unpublished_code) {
publish_queue_.emplace_back(std::move(c));
}
return;
}
publisher_running_ = true;
}
while (true) {
PublishCompilationResults(std::move(unpublished_code));
unpublished_code.clear();
// Keep publishing new code that came in.
base::MutexGuard guard(&publish_mutex_);
DCHECK(publisher_running_);
if (publish_queue_.empty()) {
publisher_running_ = false;
return;
}
unpublished_code.swap(publish_queue_);
}
}
void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (current_compile_job_ && current_compile_job_->IsRunning()) {
current_compile_job_->NotifyConcurrencyIncrease();
......@@ -3208,7 +3190,7 @@ void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (failed()) return;
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(background_compile_token_,
std::make_unique<BackgroundCompileJob>(native_module_weak_,
async_counters_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
......@@ -3227,12 +3209,14 @@ size_t CompilationStateImpl::NumOutstandingCompilations() const {
}
void CompilationStateImpl::SetError() {
compile_cancelled_.store(true, std::memory_order_relaxed);
if (compile_failed_.exchange(true, std::memory_order_relaxed)) {
return; // Already failed before.
}
base::MutexGuard callbacks_guard(&callbacks_mutex_);
TriggerCallbacks();
callbacks_.clear();
}
void CompilationStateImpl::WaitForCompilationEvent(
......@@ -3250,7 +3234,7 @@ void CompilationStateImpl::WaitForCompilationEvent(
}
constexpr JobDelegate* kNoDelegate = nullptr;
ExecuteCompilationUnits(background_compile_token_, async_counters_.get(),
ExecuteCompilationUnits(native_module_weak_, async_counters_.get(),
kNoDelegate, kBaselineOnly);
compilation_event_semaphore->Wait();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment