Commit 69388737 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

Reland "[wasm] Use NumOutstandingCompilations() in BackgroundCompileJob:GetMaxConcurrency()"

This is a reland of 84eec6e9

Original change's description:
> [wasm] Use NumOutstandingCompilations() in BackgroundCompileJob:GetMaxConcurrency()
>
> This simplifies current_compile_job_ since ScheduleCompileJobForNewUnits
> is only called on the main thread.
>
> From pinpoint:
> v8:wasm:sync_instantiate:wall_time: 19.1% improvement
> v8-gc-incremental-step: 20.5% improvement
> https://pinpoint-dot-chromeperf.appspot.com/job/152920d8520000
>
> Change-Id: Id560080937f5439cf3321ce9306c7cae49e74798
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2442383
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: Clemens Backes <clemensb@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#70386}

Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_rel_ng
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_isolates_rel_ng
Change-Id: Ic989b64f130a00ce52228cdd2f57f4c1ade354f2
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2458147
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Reviewed-by: 's avatarClemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70418}
parent 11513893
...@@ -540,39 +540,6 @@ class CompilationUnitQueues { ...@@ -540,39 +540,6 @@ class CompilationUnitQueues {
} }
}; };
// {JobHandle} is not thread safe in general (at least both the
// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect
// concurrent accesses via a mutex.
class ThreadSafeJobHandle {
public:
explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle)
: job_handle_(std::move(job_handle)) {}
void NotifyConcurrencyIncrease() {
base::MutexGuard guard(&mutex_);
job_handle_->NotifyConcurrencyIncrease();
}
void Join() {
base::MutexGuard guard(&mutex_);
job_handle_->Join();
}
void Cancel() {
base::MutexGuard guard(&mutex_);
job_handle_->Cancel();
}
bool IsRunning() const {
base::MutexGuard guard(&mutex_);
return job_handle_->IsRunning();
}
private:
mutable base::Mutex mutex_;
std::shared_ptr<JobHandle> job_handle_;
};
// The {CompilationStateImpl} keeps track of the compilation state of the // The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled. // owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background // It contains a task manager to allow parallel and asynchronous background
...@@ -635,7 +602,7 @@ class CompilationStateImpl { ...@@ -635,7 +602,7 @@ class CompilationStateImpl {
void PublishDetectedFeatures(Isolate*); void PublishDetectedFeatures(Isolate*);
// Ensure that a compilation job is running, and increase its concurrency if // Ensure that a compilation job is running, and increase its concurrency if
// needed. // needed.
void ScheduleCompileJobForNewUnits(int new_units); void ScheduleCompileJobForNewUnits();
size_t NumOutstandingCompilations() const; size_t NumOutstandingCompilations() const;
...@@ -694,19 +661,6 @@ class CompilationStateImpl { ...@@ -694,19 +661,6 @@ class CompilationStateImpl {
// using relaxed semantics. // using relaxed semantics.
std::atomic<bool> compile_failed_{false}; std::atomic<bool> compile_failed_{false};
// The atomic counter is shared with the compilation job. It's increased if
// more units are added, and decreased when the queue drops to zero. Hence
// it's an approximation of the current number of available units in the
// queue, but it's not updated after popping a single unit, because that
// would create too much contention.
// This counter is not used for synchronization, hence relaxed memory ordering
// can be used. The thread that increases the counter is the same that calls
// {NotifyConcurrencyIncrease} later. The only reduction of the counter is a
// drop to zero after a worker does not find any unit in the queue, and after
// that drop another check is executed to ensure that any left-over units are
// still processed.
std::shared_ptr<std::atomic<int>> scheduled_units_approximation_ =
std::make_shared<std::atomic<int>>(0);
const int max_compile_concurrency_ = 0; const int max_compile_concurrency_ = 0;
CompilationUnitQueues compilation_unit_queues_; CompilationUnitQueues compilation_unit_queues_;
...@@ -725,7 +679,7 @@ class CompilationStateImpl { ...@@ -725,7 +679,7 @@ class CompilationStateImpl {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Protected by {mutex_}: // Protected by {mutex_}:
std::shared_ptr<ThreadSafeJobHandle> current_compile_job_; std::shared_ptr<JobHandle> current_compile_job_;
// Features detected to be used in this module. Features can be detected // Features detected to be used in this module. Features can be detected
// as a module is being compiled. // as a module is being compiled.
...@@ -1634,51 +1588,31 @@ void CompileNativeModule(Isolate* isolate, ...@@ -1634,51 +1588,31 @@ void CompileNativeModule(Isolate* isolate,
// The runnable task that performs compilations in the background. // The runnable task that performs compilations in the background.
class BackgroundCompileJob : public JobTask { class BackgroundCompileJob : public JobTask {
public: public:
explicit BackgroundCompileJob( explicit BackgroundCompileJob(std::shared_ptr<BackgroundCompileToken> token,
std::shared_ptr<BackgroundCompileToken> token, std::shared_ptr<Counters> async_counters,
std::shared_ptr<Counters> async_counters, size_t max_concurrency)
std::shared_ptr<std::atomic<int>> scheduled_units_approximation,
size_t max_concurrency)
: token_(std::move(token)), : token_(std::move(token)),
async_counters_(std::move(async_counters)), async_counters_(std::move(async_counters)),
scheduled_units_approximation_(
std::move(scheduled_units_approximation)),
max_concurrency_(max_concurrency) {} max_concurrency_(max_concurrency) {}
void Run(JobDelegate* delegate) override { void Run(JobDelegate* delegate) override {
if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate, ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
kBaselineOrTopTier) == kYield) { kBaselineOrTopTier);
return;
}
// Otherwise we didn't find any more units to execute. Reduce the atomic
// counter of the approximated number of available units to zero, but then
// check whether any more units were added in the meantime, and increase
// back if necessary.
scheduled_units_approximation_->store(0, std::memory_order_relaxed);
BackgroundCompileScope scope(token_);
if (scope.cancelled()) return;
size_t outstanding_units =
scope.compilation_state()->NumOutstandingCompilations();
if (outstanding_units == 0) return;
// On a race between this thread and the thread which scheduled the units,
// this might increase concurrency more than needed, which is fine. It
// will be reduced again when the first task finds no more work to do.
scope.compilation_state()->ScheduleCompileJobForNewUnits(
static_cast<int>(outstanding_units));
} }
size_t GetMaxConcurrency(size_t worker_count) const override { size_t GetMaxConcurrency(size_t worker_count) const override {
// {current_concurrency_} does not reflect the units that running workers BackgroundCompileScope scope(token_);
// are processing, thus add the current worker count to that number. if (scope.cancelled()) return 0;
return std::min(max_concurrency_, // NumOutstandingCompilations() does not reflect the units that running
worker_count + scheduled_units_approximation_->load()); // workers are processing, thus add the current worker count to that number.
return std::min<size_t>(
max_concurrency_,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
} }
private: private:
const std::shared_ptr<BackgroundCompileToken> token_; const std::shared_ptr<BackgroundCompileToken> token_;
const std::shared_ptr<Counters> async_counters_; const std::shared_ptr<Counters> async_counters_;
const std::shared_ptr<std::atomic<int>> scheduled_units_approximation_;
const size_t max_concurrency_; const size_t max_concurrency_;
}; };
...@@ -2939,7 +2873,9 @@ void CompilationStateImpl::InitializeRecompilation( ...@@ -2939,7 +2873,9 @@ void CompilationStateImpl::InitializeRecompilation(
// start yet, and new code will be kept tiered-down from the start. For // start yet, and new code will be kept tiered-down from the start. For
// streaming compilation, there is a special path to tier down later, when // streaming compilation, there is a special path to tier down later, when
// the module is complete. In any case, we don't need to recompile here. // the module is complete. In any case, we don't need to recompile here.
base::Optional<CompilationUnitBuilder> builder;
if (compilation_progress_.size() > 0) { if (compilation_progress_.size() > 0) {
builder.emplace(native_module_);
const WasmModule* module = native_module_->module(); const WasmModule* module = native_module_->module();
DCHECK_EQ(module->num_declared_functions, compilation_progress_.size()); DCHECK_EQ(module->num_declared_functions, compilation_progress_.size());
DCHECK_GE(module->num_declared_functions, DCHECK_GE(module->num_declared_functions,
...@@ -2954,15 +2890,13 @@ void CompilationStateImpl::InitializeRecompilation( ...@@ -2954,15 +2890,13 @@ void CompilationStateImpl::InitializeRecompilation(
: ExecutionTier::kTurbofan; : ExecutionTier::kTurbofan;
int imported = module->num_imported_functions; int imported = module->num_imported_functions;
// Generate necessary compilation units on the fly. // Generate necessary compilation units on the fly.
CompilationUnitBuilder builder(native_module_);
for (int function_index : recompile_function_indexes) { for (int function_index : recompile_function_indexes) {
DCHECK_LE(imported, function_index); DCHECK_LE(imported, function_index);
int slot_index = function_index - imported; int slot_index = function_index - imported;
auto& progress = compilation_progress_[slot_index]; auto& progress = compilation_progress_[slot_index];
progress = MissingRecompilationField::update(progress, true); progress = MissingRecompilationField::update(progress, true);
builder.AddRecompilationUnit(function_index, new_tier); builder->AddRecompilationUnit(function_index, new_tier);
} }
builder.Commit();
} }
// Trigger callback if module needs no recompilation. // Trigger callback if module needs no recompilation.
...@@ -2970,6 +2904,12 @@ void CompilationStateImpl::InitializeRecompilation( ...@@ -2970,6 +2904,12 @@ void CompilationStateImpl::InitializeRecompilation(
TriggerCallbacks(base::EnumSet<CompilationEvent>( TriggerCallbacks(base::EnumSet<CompilationEvent>(
{CompilationEvent::kFinishedRecompilation})); {CompilationEvent::kFinishedRecompilation}));
} }
if (builder.has_value()) {
// Avoid holding lock while scheduling a compile job.
guard.reset();
builder->Commit();
}
} }
void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) { void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) {
...@@ -3003,10 +2943,7 @@ void CompilationStateImpl::AddCompilationUnits( ...@@ -3003,10 +2943,7 @@ void CompilationStateImpl::AddCompilationUnits(
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(), js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(), js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end()); js_to_wasm_wrapper_units.end());
ScheduleCompileJobForNewUnits();
size_t total_units = baseline_units.size() + top_tier_units.size() +
js_to_wasm_wrapper_units.size();
ScheduleCompileJobForNewUnits(static_cast<int>(total_units));
} }
void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
...@@ -3016,7 +2953,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { ...@@ -3016,7 +2953,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
void CompilationStateImpl::AddTopTierPriorityCompilationUnit( void CompilationStateImpl::AddTopTierPriorityCompilationUnit(
WasmCompilationUnit unit, size_t priority) { WasmCompilationUnit unit, size_t priority) {
compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority); compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority);
ScheduleCompileJobForNewUnits(1); ScheduleCompileJobForNewUnits();
} }
std::shared_ptr<JSToWasmWrapperCompilationUnit> std::shared_ptr<JSToWasmWrapperCompilationUnit>
...@@ -3241,35 +3178,20 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) { ...@@ -3241,35 +3178,20 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) {
UpdateFeatureUseCounts(isolate, detected_features_); UpdateFeatureUseCounts(isolate, detected_features_);
} }
void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) { void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
// Increase the {scheduled_units_approximation_} counter and remember the old
// value to check whether it increased towards {max_compile_concurrency_}.
// In that case, we need to notify the compile job about the increased
// concurrency.
DCHECK_LT(0, new_units);
int old_units = scheduled_units_approximation_->fetch_add(
new_units, std::memory_order_relaxed);
bool concurrency_increased = old_units < max_compile_concurrency_;
base::MutexGuard guard(&mutex_);
if (current_compile_job_ && current_compile_job_->IsRunning()) { if (current_compile_job_ && current_compile_job_->IsRunning()) {
if (concurrency_increased) { current_compile_job_->NotifyConcurrencyIncrease();
current_compile_job_->NotifyConcurrencyIncrease();
}
return; return;
} }
if (failed()) return; if (failed()) return;
std::unique_ptr<JobTask> new_compile_job = std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>( std::make_unique<BackgroundCompileJob>(
background_compile_token_, async_counters_, background_compile_token_, async_counters_, max_compile_concurrency_);
scheduled_units_approximation_, max_compile_concurrency_);
// TODO(wasm): Lower priority for TurboFan-only jobs. // TODO(wasm): Lower priority for TurboFan-only jobs.
std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob( current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible, std::move(new_compile_job)); TaskPriority::kUserVisible, std::move(new_compile_job));
native_module_->engine()->ShepherdCompileJobHandle(handle); native_module_->engine()->ShepherdCompileJobHandle(current_compile_job_);
current_compile_job_ =
std::make_unique<ThreadSafeJobHandle>(std::move(handle));
} }
size_t CompilationStateImpl::NumOutstandingCompilations() const { size_t CompilationStateImpl::NumOutstandingCompilations() const {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment