Commit c61ac3d2 authored by Clemens Hammacher's avatar Clemens Hammacher Committed by Commit Bot

Reland "[wasm] Use work-stealing queues for background compilation"

This is a reland of d746be9c

Original change's description:
> [wasm] Use work-stealing queues for background compilation
> 
> This reduces contention on the mutex protecting the {CompilationState}
> by splitting the compilation unit queues into several queues (one per
> background task). Each task executes its own queue first, and steals
> from other queues once it runs out of work.
> The implementation of the set of work-stealing queues is encapsulated
> in the new {CompilationUnitQueues} class in module-compiler.cc.
> 
> R=titzer@chromium.org
> 
> Bug: v8:8916
> Change-Id: I5a40314917e7d4a35d7ff9e8ec124ec212beacab
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1543350
> Commit-Queue: Clemens Hammacher <clemensh@chromium.org>
> Reviewed-by: Michael Starzinger <mstarzinger@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#60572}

Bug: v8:8916
Change-Id: Ic0cbad0ddc31be24715c5490b9ec71a39186fd3b
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1549172
Commit-Queue: Clemens Hammacher <clemensh@chromium.org>
Reviewed-by: 's avatarMichael Starzinger <mstarzinger@chromium.org>
Cr-Commit-Position: refs/heads/master@{#60653}
parent fd2b56f7
......@@ -4,10 +4,13 @@
#include "src/wasm/module-compiler.h"
#include <algorithm>
#include "src/api.h"
#include "src/asmjs/asm-js.h"
#include "src/base/enum-set.h"
#include "src/base/optional.h"
#include "src/base/platform/mutex.h"
#include "src/base/template-utils.h"
#include "src/base/utils/random-number-generator.h"
#include "src/compiler/wasm-compiler.h"
......@@ -111,6 +114,170 @@ class BackgroundCompileScope {
std::shared_ptr<NativeModule> const native_module_;
};
// A set of work-stealing queues (vectors of units). Each background compile
// task owns one of the queues and steals from all others once its own queue
// runs empty.
class CompilationUnitQueues {
public:
explicit CompilationUnitQueues(int max_tasks) : queues_(max_tasks) {
DCHECK_LT(0, max_tasks);
for (int task_id = 0; task_id < max_tasks; ++task_id) {
queues_[task_id].next_steal_task_id_ = next_task_id(task_id);
}
for (auto& atomic_counter : num_units_) {
std::atomic_init(&atomic_counter, size_t{0});
}
}
std::unique_ptr<WasmCompilationUnit> GetNextUnit(int task_id) {
DCHECK_LE(0, task_id);
DCHECK_GT(queues_.size(), task_id);
// As long as any lower-tier units are outstanding we need to steal them
// before executing own higher-tier units.
for (int tier = GetLowestTierWithUnits(); tier < kNumTiers; ++tier) {
Queue* queue = &queues_[task_id];
// First, check whether our own queue has a unit of the wanted tier. If
// so, return it, otherwise get the task id to steal from.
int steal_task_id;
{
base::MutexGuard mutex_guard(&queue->mutex_);
if (!queue->units_[tier].empty()) {
auto unit = std::move(queue->units_[tier].back());
queue->units_[tier].pop_back();
DecrementUnitCount(tier);
return unit;
}
steal_task_id = queue->next_steal_task_id_;
}
// Try to steal from all other queues. If none of this succeeds, the outer
// loop increases the tier and retries.
size_t steal_trials = queues_.size();
for (; steal_trials > 0;
--steal_trials, steal_task_id = next_task_id(steal_task_id)) {
if (steal_task_id == task_id) continue;
if (auto unit = StealUnitsAndGetFirst(task_id, steal_task_id, tier)) {
DecrementUnitCount(tier);
return unit;
}
}
}
return {};
}
void AddUnits(Vector<std::unique_ptr<WasmCompilationUnit>> baseline_units,
Vector<std::unique_ptr<WasmCompilationUnit>> top_tier_units) {
DCHECK_LT(0, baseline_units.size() + top_tier_units.size());
// Add to the individual queues in a round-robin fashion. No special care is
// taken to balance them; they will be balanced by work stealing.
int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
while (!next_queue_to_add.compare_exchange_weak(
queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) {
// Retry with updated {queue_to_add}.
}
Queue* queue = &queues_[queue_to_add];
base::MutexGuard guard(&queue->mutex_);
if (!baseline_units.empty()) {
queue->units_[kBaseline].insert(
queue->units_[kBaseline].end(),
std::make_move_iterator(baseline_units.begin()),
std::make_move_iterator(baseline_units.end()));
num_units_[kBaseline].fetch_add(baseline_units.size(),
std::memory_order_relaxed);
}
if (!top_tier_units.empty()) {
queue->units_[kTopTier].insert(
queue->units_[kTopTier].end(),
std::make_move_iterator(top_tier_units.begin()),
std::make_move_iterator(top_tier_units.end()));
num_units_[kTopTier].fetch_add(top_tier_units.size(),
std::memory_order_relaxed);
}
}
// Get the current total number of units in all queues. This is only a
// momentary snapshot, it's not guaranteed that {GetNextUnit} returns a unit
// if this method returns non-zero.
size_t GetTotalSize() const {
size_t total = 0;
for (auto& atomic_counter : num_units_) {
total += atomic_counter.load(std::memory_order_relaxed);
}
return total;
}
private:
// Store tier in int so we can easily loop over it:
static constexpr int kBaseline = 0;
static constexpr int kTopTier = 1;
static constexpr int kNumTiers = kTopTier + 1;
struct Queue {
base::Mutex mutex_;
// Protected by {mutex_}:
std::vector<std::unique_ptr<WasmCompilationUnit>> units_[kNumTiers];
int next_steal_task_id_;
// End of fields protected by {mutex_}.
};
std::vector<Queue> queues_;
std::atomic<size_t> num_units_[kNumTiers];
std::atomic<int> next_queue_to_add{0};
int next_task_id(int task_id) const {
int next = task_id + 1;
return next == static_cast<int>(queues_.size()) ? 0 : next;
}
int GetLowestTierWithUnits() const {
for (int tier = 0; tier < kNumTiers; ++tier) {
if (num_units_[tier].load(std::memory_order_relaxed) > 0) return tier;
}
return kNumTiers;
}
void DecrementUnitCount(int tier) {
size_t old_units_count = num_units_[tier].fetch_sub(1);
DCHECK_LE(1, old_units_count);
USE(old_units_count);
}
// Steal units of {wanted_tier} from {steal_from_task_id} to {task_id}. Return
// first stolen unit (rest put in queue of {task_id}), or {nullptr} if
// {steal_from_task_id} had no units of {wanted_tier}.
std::unique_ptr<WasmCompilationUnit> StealUnitsAndGetFirst(
int task_id, int steal_from_task_id, int wanted_tier) {
DCHECK_NE(task_id, steal_from_task_id);
std::vector<std::unique_ptr<WasmCompilationUnit>> stolen;
{
Queue* steal_queue = &queues_[steal_from_task_id];
base::MutexGuard guard(&steal_queue->mutex_);
if (steal_queue->units_[wanted_tier].empty()) return {};
auto* steal_from_vector = &steal_queue->units_[wanted_tier];
size_t remaining = steal_from_vector->size() / 2;
stolen.assign(
std::make_move_iterator(steal_from_vector->begin()) + remaining,
std::make_move_iterator(steal_from_vector->end()));
steal_from_vector->resize(remaining);
}
DCHECK(!stolen.empty());
auto returned_unit = std::move(stolen.back());
stolen.pop_back();
Queue* queue = &queues_[task_id];
base::MutexGuard guard(&queue->mutex_);
auto* target_queue = &queue->units_[wanted_tier];
target_queue->insert(target_queue->end(),
std::make_move_iterator(stolen.begin()),
std::make_move_iterator(stolen.end()));
queue->next_steal_task_id_ = next_task_id(steal_from_task_id);
return returned_unit;
}
};
// The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background
......@@ -137,18 +304,16 @@ class CompilationStateImpl {
// Inserts new functions to compile and kicks off compilation.
void AddCompilationUnits(
std::vector<std::unique_ptr<WasmCompilationUnit>>& baseline_units,
std::vector<std::unique_ptr<WasmCompilationUnit>>& top_tier_units);
Vector<std::unique_ptr<WasmCompilationUnit>> baseline_units,
Vector<std::unique_ptr<WasmCompilationUnit>> top_tier_units);
void AddTopTierCompilationUnit(std::unique_ptr<WasmCompilationUnit>);
std::unique_ptr<WasmCompilationUnit> GetNextCompilationUnit();
std::unique_ptr<WasmCompilationUnit> GetNextCompilationUnit(int task_id);
void OnFinishedUnit(WasmCode*);
void OnFinishedUnits(Vector<WasmCode*>);
void ReportDetectedFeatures(const WasmFeatures& detected);
void OnBackgroundTaskStopped(const WasmFeatures& detected);
void OnBackgroundTaskStopped(int task_id, const WasmFeatures& detected);
void PublishDetectedFeatures(Isolate* isolate, const WasmFeatures& detected);
void RestartBackgroundCompileTask();
void RestartBackgroundTasks();
void SetError();
......@@ -188,6 +353,10 @@ class CompilationStateImpl {
// using relaxed semantics.
std::atomic<bool> compile_failed_{false};
const int max_background_tasks_ = 0;
CompilationUnitQueues compilation_unit_queues_;
// This mutex protects all information of this {CompilationStateImpl} which is
// being accessed concurrently.
mutable base::Mutex mutex_;
......@@ -195,10 +364,8 @@ class CompilationStateImpl {
//////////////////////////////////////////////////////////////////////////////
// Protected by {mutex_}:
std::vector<std::unique_ptr<WasmCompilationUnit>> baseline_compilation_units_;
std::vector<std::unique_ptr<WasmCompilationUnit>> top_tier_compilation_units_;
int num_background_tasks_ = 0;
// Set of unused task ids; <= {max_background_tasks_} many.
std::vector<int> available_task_ids_;
// Features detected to be used in this module. Features can be detected
// as a module is being compiled.
......@@ -229,8 +396,6 @@ class CompilationStateImpl {
// End of fields protected by {callbacks_mutex_}.
//////////////////////////////////////////////////////////////////////////////
const int max_background_tasks_ = 0;
};
CompilationStateImpl* Impl(CompilationState* compilation_state) {
......@@ -407,7 +572,8 @@ class CompilationUnitBuilder {
bool Commit() {
if (baseline_units_.empty() && tiering_units_.empty()) return false;
compilation_state()->AddCompilationUnits(baseline_units_, tiering_units_);
compilation_state()->AddCompilationUnits(VectorOf(baseline_units_),
VectorOf(tiering_units_));
Clear();
return true;
}
......@@ -515,8 +681,8 @@ double MonotonicallyIncreasingTimeInMs() {
base::Time::kMillisecondsPerSecond;
}
// Run by each compilation task and by the main thread (i.e. in both
// foreground and background threads).
// Run by the main thread to take part in compilation. Only used for synchronous
// compilation.
bool FetchAndExecuteCompilationUnit(CompilationEnv* env,
NativeModule* native_module,
CompilationStateImpl* compilation_state,
......@@ -524,9 +690,12 @@ bool FetchAndExecuteCompilationUnit(CompilationEnv* env,
Counters* counters) {
DisallowHeapAccess no_heap_access;
// The main thread uses task id 0, which might collide with one of the
// background tasks. This is fine, as it will only cause some contention on
// the one queue, but work otherwise.
constexpr int kMainThreadTaskId = 0;
std::unique_ptr<WasmCompilationUnit> unit =
compilation_state->GetNextCompilationUnit();
compilation_state->GetNextCompilationUnit(kMainThreadTaskId);
if (unit == nullptr) return false;
WasmCompilationResult result = unit->ExecuteCompilation(
......@@ -642,7 +811,7 @@ void CompileInParallel(Isolate* isolate, NativeModule* native_module) {
// 1) The main thread allocates a compilation unit for each wasm function
// and stores them in the vector {compilation_units} within the
// {compilation_state}. By adding units to the {compilation_state}, new
// {BackgroundCompileTasks} instances are spawned which run on
// {BackgroundCompileTask} instances are spawned which run on
// the background threads.
// 2) The background threads and the main thread pick one compilation unit at
// a time and execute the parallel phase of the compilation unit.
......@@ -752,10 +921,12 @@ class BackgroundCompileTask : public CancelableTask {
public:
explicit BackgroundCompileTask(CancelableTaskManager* manager,
std::shared_ptr<BackgroundCompileToken> token,
std::shared_ptr<Counters> async_counters)
std::shared_ptr<Counters> async_counters,
int task_id)
: CancelableTask(manager),
token_(std::move(token)),
async_counters_(std::move(async_counters)) {}
async_counters_(std::move(async_counters)),
task_id_(task_id) {}
void RunInternal() override {
TRACE_COMPILE("(3b) Compiling...\n");
......@@ -780,10 +951,11 @@ class BackgroundCompileTask : public CancelableTask {
env.emplace(compile_scope.native_module()->CreateCompilationEnv());
wire_bytes = compile_scope.compilation_state()->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module();
unit = compile_scope.compilation_state()->GetNextCompilationUnit();
unit =
compile_scope.compilation_state()->GetNextCompilationUnit(task_id_);
if (unit == nullptr) {
compile_scope.compilation_state()->OnBackgroundTaskStopped(
detected_features);
task_id_, detected_features);
return;
}
}
......@@ -818,7 +990,7 @@ class BackgroundCompileTask : public CancelableTask {
// Compile error.
compile_scope.compilation_state()->SetError();
compile_scope.compilation_state()->OnBackgroundTaskStopped(
detected_features);
task_id_, detected_features);
compilation_failed = true;
break;
}
......@@ -827,20 +999,18 @@ class BackgroundCompileTask : public CancelableTask {
publish_results(&compile_scope);
}
// Get next unit.
if (deadline < MonotonicallyIncreasingTimeInMs()) {
publish_results(&compile_scope);
compile_scope.compilation_state()->ReportDetectedFeatures(
detected_features);
compile_scope.compilation_state()->RestartBackgroundCompileTask();
return;
unit = nullptr;
} else {
unit = compile_scope.compilation_state()->GetNextCompilationUnit(
task_id_);
}
// Get next unit.
unit = compile_scope.compilation_state()->GetNextCompilationUnit();
if (unit == nullptr) {
publish_results(&compile_scope);
compile_scope.compilation_state()->OnBackgroundTaskStopped(
detected_features);
task_id_, detected_features);
return;
}
}
......@@ -854,6 +1024,7 @@ class BackgroundCompileTask : public CancelableTask {
private:
std::shared_ptr<BackgroundCompileToken> token_;
std::shared_ptr<Counters> async_counters_;
const int task_id_;
};
} // namespace
......@@ -1676,7 +1847,15 @@ CompilationStateImpl::CompilationStateImpl(
async_counters_(std::move(async_counters)),
max_background_tasks_(std::max(
1, std::min(FLAG_wasm_num_compilation_tasks,
V8::GetCurrentPlatform()->NumberOfWorkerThreads()))) {}
V8::GetCurrentPlatform()->NumberOfWorkerThreads()))),
compilation_unit_queues_(max_background_tasks_),
available_task_ids_(max_background_tasks_) {
for (int i = 0; i < max_background_tasks_; ++i) {
// Ids are popped on task creation, so reverse this list. This ensures that
// the first background task gets id 0.
available_task_ids_[i] = max_background_tasks_ - 1 - i;
}
}
void CompilationStateImpl::AbortCompilation() {
background_compile_token_->Cancel();
......@@ -1702,63 +1881,21 @@ void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) {
}
void CompilationStateImpl::AddCompilationUnits(
std::vector<std::unique_ptr<WasmCompilationUnit>>& baseline_units,
std::vector<std::unique_ptr<WasmCompilationUnit>>& top_tier_units) {
{
base::MutexGuard guard(&mutex_);
DCHECK_IMPLIES(compile_mode_ == CompileMode::kRegular,
top_tier_compilation_units_.empty());
baseline_compilation_units_.insert(
baseline_compilation_units_.end(),
std::make_move_iterator(baseline_units.begin()),
std::make_move_iterator(baseline_units.end()));
if (!top_tier_units.empty()) {
top_tier_compilation_units_.insert(
top_tier_compilation_units_.end(),
std::make_move_iterator(top_tier_units.begin()),
std::make_move_iterator(top_tier_units.end()));
}
}
Vector<std::unique_ptr<WasmCompilationUnit>> baseline_units,
Vector<std::unique_ptr<WasmCompilationUnit>> top_tier_units) {
compilation_unit_queues_.AddUnits(baseline_units, top_tier_units);
RestartBackgroundTasks();
}
void CompilationStateImpl::AddTopTierCompilationUnit(
std::unique_ptr<WasmCompilationUnit> unit) {
{
base::MutexGuard guard(&mutex_);
DCHECK_EQ(compile_mode_, CompileMode::kTiering);
DCHECK(FLAG_wasm_lazy_compilation || FLAG_asm_wasm_lazy_compilation ||
native_module_->enabled_features().compilation_hints);
top_tier_compilation_units_.emplace_back(std::move(unit));
}
RestartBackgroundTasks();
AddCompilationUnits({}, {&unit, 1});
}
std::unique_ptr<WasmCompilationUnit>
CompilationStateImpl::GetNextCompilationUnit() {
base::MutexGuard guard(&mutex_);
std::vector<std::unique_ptr<WasmCompilationUnit>>* units = nullptr;
if (!baseline_compilation_units_.empty()) {
units = &baseline_compilation_units_;
} else if (!top_tier_compilation_units_.empty()) {
units = &top_tier_compilation_units_;
} else {
return std::unique_ptr<WasmCompilationUnit>();
}
DCHECK_NOT_NULL(units);
DCHECK(!units->empty());
std::unique_ptr<WasmCompilationUnit> unit = std::move(units->back());
units->pop_back();
return unit;
CompilationStateImpl::GetNextCompilationUnit(int task_id) {
return compilation_unit_queues_.GetNextUnit(task_id);
}
void CompilationStateImpl::OnFinishedUnit(WasmCode* code) {
......@@ -1847,31 +1984,21 @@ void CompilationStateImpl::OnFinishedUnits(Vector<WasmCode*> code_vector) {
}
}
void CompilationStateImpl::RestartBackgroundCompileTask() {
auto task =
native_module_->engine()->NewBackgroundCompileTask<BackgroundCompileTask>(
background_compile_token_, async_counters_);
if (baseline_compilation_finished()) {
V8::GetCurrentPlatform()->CallLowPriorityTaskOnWorkerThread(
std::move(task));
} else {
V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(task));
void CompilationStateImpl::OnBackgroundTaskStopped(
int task_id, const WasmFeatures& detected) {
{
base::MutexGuard guard(&mutex_);
DCHECK_EQ(0, std::count(available_task_ids_.begin(),
available_task_ids_.end(), task_id));
DCHECK_GT(max_background_tasks_, available_task_ids_.size());
available_task_ids_.push_back(task_id);
UnionFeaturesInto(&detected_features_, detected);
}
}
void CompilationStateImpl::ReportDetectedFeatures(
const WasmFeatures& detected) {
base::MutexGuard guard(&mutex_);
UnionFeaturesInto(&detected_features_, detected);
}
void CompilationStateImpl::OnBackgroundTaskStopped(
const WasmFeatures& detected) {
base::MutexGuard guard(&mutex_);
DCHECK_LE(1, num_background_tasks_);
--num_background_tasks_;
UnionFeaturesInto(&detected_features_, detected);
// The background task could have stopped while we were adding new units, or
// because it reached its deadline. In both cases we need to restart tasks to
// avoid a potential deadlock.
RestartBackgroundTasks();
}
void CompilationStateImpl::PublishDetectedFeatures(
......@@ -1885,26 +2012,39 @@ void CompilationStateImpl::PublishDetectedFeatures(
}
void CompilationStateImpl::RestartBackgroundTasks() {
int num_restart;
// Create new tasks, but only spawn them after releasing the mutex, because
// some platforms (e.g. the predictable platform) might execute tasks right
// away.
std::vector<std::unique_ptr<Task>> new_tasks;
{
base::MutexGuard guard(&mutex_);
// Explicit fast path (quite common): If no more task ids are available
// (i.e. {max_background_tasks_} tasks are already running), spawn nothing.
if (available_task_ids_.empty()) return;
// No need to restart tasks if compilation already failed.
if (failed()) return;
DCHECK_LE(num_background_tasks_, max_background_tasks_);
if (num_background_tasks_ == max_background_tasks_) return;
size_t num_compilation_units =
baseline_compilation_units_.size() + top_tier_compilation_units_.size();
num_restart = max_background_tasks_ - num_background_tasks_;
DCHECK_LE(0, num_restart);
if (num_compilation_units < static_cast<size_t>(num_restart)) {
num_restart = static_cast<int>(num_compilation_units);
size_t max_num_restart = compilation_unit_queues_.GetTotalSize();
while (!available_task_ids_.empty() && max_num_restart-- > 0) {
int task_id = available_task_ids_.back();
available_task_ids_.pop_back();
new_tasks.emplace_back(
native_module_->engine()
->NewBackgroundCompileTask<BackgroundCompileTask>(
background_compile_token_, async_counters_, task_id));
}
num_background_tasks_ += num_restart;
}
for (; num_restart > 0; --num_restart) {
RestartBackgroundCompileTask();
if (baseline_compilation_finished()) {
for (auto& task : new_tasks) {
V8::GetCurrentPlatform()->CallLowPriorityTaskOnWorkerThread(
std::move(task));
}
} else {
for (auto& task : new_tasks) {
V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(task));
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment