Commit 399504d2 authored by Clemens Backes's avatar Clemens Backes Committed by Commit Bot

Revert "[wasm] Use NumOutstandingCompilations() in BackgroundCompileJob:GetMaxConcurrency()"

This reverts commit 84eec6e9.

Reason for revert: TSan issues (lock order inversion): https://ci.chromium.org/p/v8/builders/ci/V8%20Linux64%20TSAN%20-%20isolates/11658

Original change's description:
> [wasm] Use NumOutstandingCompilations() in BackgroundCompileJob:GetMaxConcurrency()
>
> This simplifies current_compile_job_ since ScheduleCompileJobForNewUnits
> is only called on the main thread.
>
> From pinpoint:
> v8:wasm:sync_instantiate:wall_time: 19.1% improvement
> v8-gc-incremental-step: 20.5% improvement
> https://pinpoint-dot-chromeperf.appspot.com/job/152920d8520000
>
> Change-Id: Id560080937f5439cf3321ce9306c7cae49e74798
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2442383
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: Clemens Backes <clemensb@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#70386}

TBR=clemensb@chromium.org,etiennep@chromium.org

Change-Id: Iaa7df7fbfc56fcc7bf8400671f13210a1984885f
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2456768Reviewed-by: 's avatarClemens Backes <clemensb@chromium.org>
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70388}
parent 4848de2a
...@@ -540,6 +540,39 @@ class CompilationUnitQueues { ...@@ -540,6 +540,39 @@ class CompilationUnitQueues {
} }
}; };
// {JobHandle} is not thread safe in general (at least both the
// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect
// concurrent accesses via a mutex.
class ThreadSafeJobHandle {
public:
explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle)
: job_handle_(std::move(job_handle)) {}
void NotifyConcurrencyIncrease() {
base::MutexGuard guard(&mutex_);
job_handle_->NotifyConcurrencyIncrease();
}
void Join() {
base::MutexGuard guard(&mutex_);
job_handle_->Join();
}
void Cancel() {
base::MutexGuard guard(&mutex_);
job_handle_->Cancel();
}
bool IsRunning() const {
base::MutexGuard guard(&mutex_);
return job_handle_->IsRunning();
}
private:
mutable base::Mutex mutex_;
std::shared_ptr<JobHandle> job_handle_;
};
// The {CompilationStateImpl} keeps track of the compilation state of the // The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled. // owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background // It contains a task manager to allow parallel and asynchronous background
...@@ -602,7 +635,7 @@ class CompilationStateImpl { ...@@ -602,7 +635,7 @@ class CompilationStateImpl {
void PublishDetectedFeatures(Isolate*); void PublishDetectedFeatures(Isolate*);
// Ensure that a compilation job is running, and increase its concurrency if // Ensure that a compilation job is running, and increase its concurrency if
// needed. // needed.
void ScheduleCompileJobForNewUnits(); void ScheduleCompileJobForNewUnits(int new_units);
size_t NumOutstandingCompilations() const; size_t NumOutstandingCompilations() const;
...@@ -661,6 +694,19 @@ class CompilationStateImpl { ...@@ -661,6 +694,19 @@ class CompilationStateImpl {
// using relaxed semantics. // using relaxed semantics.
std::atomic<bool> compile_failed_{false}; std::atomic<bool> compile_failed_{false};
// The atomic counter is shared with the compilation job. It's increased if
// more units are added, and decreased when the queue drops to zero. Hence
// it's an approximation of the current number of available units in the
// queue, but it's not updated after popping a single unit, because that
// would create too much contention.
// This counter is not used for synchronization, hence relaxed memory ordering
// can be used. The thread that increases the counter is the same that calls
// {NotifyConcurrencyIncrease} later. The only reduction of the counter is a
// drop to zero after a worker does not find any unit in the queue, and after
// that drop another check is executed to ensure that any left-over units are
// still processed.
std::shared_ptr<std::atomic<int>> scheduled_units_approximation_ =
std::make_shared<std::atomic<int>>(0);
const int max_compile_concurrency_ = 0; const int max_compile_concurrency_ = 0;
CompilationUnitQueues compilation_unit_queues_; CompilationUnitQueues compilation_unit_queues_;
...@@ -679,7 +725,7 @@ class CompilationStateImpl { ...@@ -679,7 +725,7 @@ class CompilationStateImpl {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Protected by {mutex_}: // Protected by {mutex_}:
std::shared_ptr<JobHandle> current_compile_job_; std::shared_ptr<ThreadSafeJobHandle> current_compile_job_;
// Features detected to be used in this module. Features can be detected // Features detected to be used in this module. Features can be detected
// as a module is being compiled. // as a module is being compiled.
...@@ -1588,31 +1634,51 @@ void CompileNativeModule(Isolate* isolate, ...@@ -1588,31 +1634,51 @@ void CompileNativeModule(Isolate* isolate,
// The runnable task that performs compilations in the background. // The runnable task that performs compilations in the background.
class BackgroundCompileJob : public JobTask { class BackgroundCompileJob : public JobTask {
public: public:
explicit BackgroundCompileJob(std::shared_ptr<BackgroundCompileToken> token, explicit BackgroundCompileJob(
std::shared_ptr<Counters> async_counters, std::shared_ptr<BackgroundCompileToken> token,
size_t max_concurrency) std::shared_ptr<Counters> async_counters,
std::shared_ptr<std::atomic<int>> scheduled_units_approximation,
size_t max_concurrency)
: token_(std::move(token)), : token_(std::move(token)),
async_counters_(std::move(async_counters)), async_counters_(std::move(async_counters)),
scheduled_units_approximation_(
std::move(scheduled_units_approximation)),
max_concurrency_(max_concurrency) {} max_concurrency_(max_concurrency) {}
void Run(JobDelegate* delegate) override { void Run(JobDelegate* delegate) override {
ExecuteCompilationUnits(token_, async_counters_.get(), delegate, if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
kBaselineOrTopTier); kBaselineOrTopTier) == kYield) {
return;
}
// Otherwise we didn't find any more units to execute. Reduce the atomic
// counter of the approximated number of available units to zero, but then
// check whether any more units were added in the meantime, and increase
// back if necessary.
scheduled_units_approximation_->store(0, std::memory_order_relaxed);
BackgroundCompileScope scope(token_);
if (scope.cancelled()) return;
size_t outstanding_units =
scope.compilation_state()->NumOutstandingCompilations();
if (outstanding_units == 0) return;
// On a race between this thread and the thread which scheduled the units,
// this might increase concurrency more than needed, which is fine. It
// will be reduced again when the first task finds no more work to do.
scope.compilation_state()->ScheduleCompileJobForNewUnits(
static_cast<int>(outstanding_units));
} }
size_t GetMaxConcurrency(size_t worker_count) const override { size_t GetMaxConcurrency(size_t worker_count) const override {
BackgroundCompileScope scope(token_); // {current_concurrency_} does not reflect the units that running workers
if (scope.cancelled()) return 0; // are processing, thus add the current worker count to that number.
// NumOutstandingCompilations() does not reflect the units that running return std::min(max_concurrency_,
// workers are processing, thus add the current worker count to that number. worker_count + scheduled_units_approximation_->load());
return std::min<size_t>(
max_concurrency_,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
} }
private: private:
const std::shared_ptr<BackgroundCompileToken> token_; const std::shared_ptr<BackgroundCompileToken> token_;
const std::shared_ptr<Counters> async_counters_; const std::shared_ptr<Counters> async_counters_;
const std::shared_ptr<std::atomic<int>> scheduled_units_approximation_;
const size_t max_concurrency_; const size_t max_concurrency_;
}; };
...@@ -2937,7 +3003,10 @@ void CompilationStateImpl::AddCompilationUnits( ...@@ -2937,7 +3003,10 @@ void CompilationStateImpl::AddCompilationUnits(
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(), js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(), js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end()); js_to_wasm_wrapper_units.end());
ScheduleCompileJobForNewUnits();
size_t total_units = baseline_units.size() + top_tier_units.size() +
js_to_wasm_wrapper_units.size();
ScheduleCompileJobForNewUnits(static_cast<int>(total_units));
} }
void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
...@@ -2947,7 +3016,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { ...@@ -2947,7 +3016,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
void CompilationStateImpl::AddTopTierPriorityCompilationUnit( void CompilationStateImpl::AddTopTierPriorityCompilationUnit(
WasmCompilationUnit unit, size_t priority) { WasmCompilationUnit unit, size_t priority) {
compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority); compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority);
ScheduleCompileJobForNewUnits(); ScheduleCompileJobForNewUnits(1);
} }
std::shared_ptr<JSToWasmWrapperCompilationUnit> std::shared_ptr<JSToWasmWrapperCompilationUnit>
...@@ -3172,20 +3241,35 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) { ...@@ -3172,20 +3241,35 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) {
UpdateFeatureUseCounts(isolate, detected_features_); UpdateFeatureUseCounts(isolate, detected_features_);
} }
void CompilationStateImpl::ScheduleCompileJobForNewUnits() { void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) {
// Increase the {scheduled_units_approximation_} counter and remember the old
// value to check whether it increased towards {max_compile_concurrency_}.
// In that case, we need to notify the compile job about the increased
// concurrency.
DCHECK_LT(0, new_units);
int old_units = scheduled_units_approximation_->fetch_add(
new_units, std::memory_order_relaxed);
bool concurrency_increased = old_units < max_compile_concurrency_;
base::MutexGuard guard(&mutex_);
if (current_compile_job_ && current_compile_job_->IsRunning()) { if (current_compile_job_ && current_compile_job_->IsRunning()) {
current_compile_job_->NotifyConcurrencyIncrease(); if (concurrency_increased) {
current_compile_job_->NotifyConcurrencyIncrease();
}
return; return;
} }
if (failed()) return; if (failed()) return;
std::unique_ptr<JobTask> new_compile_job = std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>( std::make_unique<BackgroundCompileJob>(
background_compile_token_, async_counters_, max_compile_concurrency_); background_compile_token_, async_counters_,
scheduled_units_approximation_, max_compile_concurrency_);
// TODO(wasm): Lower priority for TurboFan-only jobs. // TODO(wasm): Lower priority for TurboFan-only jobs.
current_compile_job_ = V8::GetCurrentPlatform()->PostJob( std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible, std::move(new_compile_job)); TaskPriority::kUserVisible, std::move(new_compile_job));
native_module_->engine()->ShepherdCompileJobHandle(current_compile_job_); native_module_->engine()->ShepherdCompileJobHandle(handle);
current_compile_job_ =
std::make_unique<ThreadSafeJobHandle>(std::move(handle));
} }
size_t CompilationStateImpl::NumOutstandingCompilations() const { size_t CompilationStateImpl::NumOutstandingCompilations() const {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment