Commit 7103dc61 authored by Clemens Backes's avatar Clemens Backes Committed by Commit Bot

[wasm] Fix regular publishing of compilation results

The logic for ensuring regular publishing in worker threads was broken
by growing the number of queues dynamically
(https://crrev.com/c/2467844). The first task(s) would assume a too
small number of worker threads, thus would publish to late (or never
before running out of units). This creates a large backlog of
to-be-published results when all threads eventually finish execution.

This CL fixes this by updating the per-task limit of results to process
before publishing. The updated value is read atomically using relaxed
memory ordering to ensure minimal impact on performance.

R=thibaudm@chromium.org

Bug: chromium:1138784, v8:11005
Change-Id: I2d00e50148e64db67a6b1a9f219ba60a1f4432ac
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2484365Reviewed-by: 's avatarThibaud Michaud <thibaudm@chromium.org>
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70646}
parent 82f6863a
...@@ -110,9 +110,12 @@ enum CompileBaselineOnly : bool { ...@@ -110,9 +110,12 @@ enum CompileBaselineOnly : bool {
class CompilationUnitQueues { class CompilationUnitQueues {
public: public:
// Public API for QueueImpl. // Public API for QueueImpl.
struct Queue {}; struct Queue {
bool ShouldPublish(int num_processed_units) const;
};
explicit CompilationUnitQueues(int num_declared_functions) { explicit CompilationUnitQueues(int num_declared_functions)
: num_declared_functions_(num_declared_functions) {
// Add one first queue, to add units to. // Add one first queue, to add units to.
queues_.emplace_back(std::make_unique<QueueImpl>(0)); queues_.emplace_back(std::make_unique<QueueImpl>(0));
...@@ -128,23 +131,41 @@ class CompilationUnitQueues { ...@@ -128,23 +131,41 @@ class CompilationUnitQueues {
} }
} }
std::tuple<Queue*, int> GetQueueForTaskAndNumQueues(int task_id) { Queue* GetQueueForTask(int task_id) {
size_t required_queues = static_cast<size_t>(task_id) + 1; int required_queues = task_id + 1;
{ {
base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_); base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
if (V8_LIKELY(queues_.size() >= required_queues)) { if (V8_LIKELY(static_cast<int>(queues_.size()) >= required_queues)) {
return std::make_tuple(queues_[task_id].get(), return queues_[task_id].get();
static_cast<int>(queues_.size()));
} }
} }
// Otherwise increase the number of queues. // Otherwise increase the number of queues.
base::SharedMutexGuard<base::kExclusive> queues_guard(&queues_mutex_); base::SharedMutexGuard<base::kExclusive> queues_guard(&queues_mutex_);
while (queues_.size() < required_queues) { int num_queues = static_cast<int>(queues_.size());
int steal_from = static_cast<int>(queues_.size() + 1); while (num_queues < required_queues) {
int steal_from = num_queues + 1;
queues_.emplace_back(std::make_unique<QueueImpl>(steal_from)); queues_.emplace_back(std::make_unique<QueueImpl>(steal_from));
++num_queues;
} }
return std::make_tuple(queues_[task_id].get(),
static_cast<int>(queues_.size())); // Update the {publish_limit}s of all queues.
// We want background threads to publish regularly (to avoid contention when
// they are all publishing at the end). On the other side, each publishing
// has some overhead (part of it for synchronizing between threads), so it
// should not happen *too* often. Thus aim for 4-8 publishes per thread, but
// distribute it such that publishing is likely to happen at different
// times.
int units_per_thread = num_declared_functions_ / num_queues;
int min = std::max(10, units_per_thread / 8);
for (auto& queue : queues_) {
// Set a limit between {min} and {2*min}, but not smaller than {10}.
int limit = min + (min * task_id / num_queues);
queue->publish_limit.store(limit, std::memory_order_relaxed);
}
return queues_[task_id].get();
} }
base::Optional<WasmCompilationUnit> GetNextUnit( base::Optional<WasmCompilationUnit> GetNextUnit(
...@@ -290,25 +311,20 @@ class CompilationUnitQueues { ...@@ -290,25 +311,20 @@ class CompilationUnitQueues {
explicit QueueImpl(int next_steal_task_id) explicit QueueImpl(int next_steal_task_id)
: next_steal_task_id(next_steal_task_id) {} : next_steal_task_id(next_steal_task_id) {}
// Number of units after which the task processing this queue should publish
// compilation results. Updated (reduced, using relaxed ordering) when new
// queues are allocated. If there is only one thread running, we can delay
// publishing arbitrarily.
std::atomic<int> publish_limit{kMaxInt};
base::Mutex mutex; base::Mutex mutex;
// All fields are protected by {mutex}. // All fields below are protected by {mutex}.
std::vector<WasmCompilationUnit> units[kNumTiers]; std::vector<WasmCompilationUnit> units[kNumTiers];
std::priority_queue<TopTierPriorityUnit> top_tier_priority_units; std::priority_queue<TopTierPriorityUnit> top_tier_priority_units;
int next_steal_task_id; int next_steal_task_id;
}; };
// {queues_mutex_} protectes {queues_};
base::SharedMutex queues_mutex_;
std::vector<std::unique_ptr<QueueImpl>> queues_;
BigUnitsQueue big_units_queue_;
std::atomic<size_t> num_units_[kNumTiers];
std::atomic<size_t> num_priority_units_{0};
std::unique_ptr<std::atomic<bool>[]> top_tier_compiled_;
std::atomic<int> next_queue_to_add{0};
int next_task_id(int task_id, size_t num_queues) const { int next_task_id(int task_id, size_t num_queues) const {
int next = task_id + 1; int next = task_id + 1;
return next == static_cast<int>(num_queues) ? 0 : next; return next == static_cast<int>(num_queues) ? 0 : next;
...@@ -481,8 +497,28 @@ class CompilationUnitQueues { ...@@ -481,8 +497,28 @@ class CompilationUnitQueues {
queue->next_steal_task_id = steal_from_task_id + 1; queue->next_steal_task_id = steal_from_task_id + 1;
return returned_unit; return returned_unit;
} }
// {queues_mutex_} protectes {queues_};
base::SharedMutex queues_mutex_;
std::vector<std::unique_ptr<QueueImpl>> queues_;
const int num_declared_functions_;
BigUnitsQueue big_units_queue_;
std::atomic<size_t> num_units_[kNumTiers];
std::atomic<size_t> num_priority_units_{0};
std::unique_ptr<std::atomic<bool>[]> top_tier_compiled_;
std::atomic<int> next_queue_to_add{0};
}; };
bool CompilationUnitQueues::Queue::ShouldPublish(
int num_processed_units) const {
auto* queue = static_cast<const QueueImpl*>(this);
return num_processed_units >=
queue->publish_limit.load(std::memory_order_relaxed);
}
// The {CompilationStateImpl} keeps track of the compilation state of the // The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled. // owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background // It contains a task manager to allow parallel and asynchronous background
...@@ -530,8 +566,7 @@ class CompilationStateImpl { ...@@ -530,8 +566,7 @@ class CompilationStateImpl {
void AddTopTierCompilationUnit(WasmCompilationUnit); void AddTopTierCompilationUnit(WasmCompilationUnit);
void AddTopTierPriorityCompilationUnit(WasmCompilationUnit, size_t); void AddTopTierPriorityCompilationUnit(WasmCompilationUnit, size_t);
std::tuple<CompilationUnitQueues::Queue*, int> GetQueueAndLimitForCompileTask( CompilationUnitQueues::Queue* GetQueueForCompileTask(int task_id);
int task_id);
base::Optional<WasmCompilationUnit> GetNextCompilationUnit( base::Optional<WasmCompilationUnit> GetNextCompilationUnit(
CompilationUnitQueues::Queue*, CompileBaselineOnly); CompilationUnitQueues::Queue*, CompileBaselineOnly);
...@@ -1176,7 +1211,6 @@ CompilationExecutionResult ExecuteCompilationUnits( ...@@ -1176,7 +1211,6 @@ CompilationExecutionResult ExecuteCompilationUnits(
int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0; int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0;
DCHECK_LE(0, task_id); DCHECK_LE(0, task_id);
CompilationUnitQueues::Queue* queue; CompilationUnitQueues::Queue* queue;
int unpublished_units_limit;
base::Optional<WasmCompilationUnit> unit; base::Optional<WasmCompilationUnit> unit;
WasmFeatures detected_features = WasmFeatures::None(); WasmFeatures detected_features = WasmFeatures::None();
...@@ -1191,8 +1225,7 @@ CompilationExecutionResult ExecuteCompilationUnits( ...@@ -1191,8 +1225,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
wire_bytes = compilation_state->GetWireBytesStorage(); wire_bytes = compilation_state->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module(); module = compile_scope.native_module()->shared_module();
wasm_engine = compile_scope.native_module()->engine(); wasm_engine = compile_scope.native_module()->engine();
std::tie(queue, unpublished_units_limit) = queue = compilation_state->GetQueueForCompileTask(task_id);
compilation_state->GetQueueAndLimitForCompileTask(task_id);
unit = compilation_state->GetNextCompilationUnit(queue, baseline_only); unit = compilation_state->GetNextCompilationUnit(queue, baseline_only);
if (!unit) return kNoMoreUnits; if (!unit) return kNoMoreUnits;
} }
...@@ -1241,8 +1274,7 @@ CompilationExecutionResult ExecuteCompilationUnits( ...@@ -1241,8 +1274,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// Also publish after finishing a certain amount of units, to avoid // Also publish after finishing a certain amount of units, to avoid
// contention when all threads publish at the end. // contention when all threads publish at the end.
if (unit->tier() == ExecutionTier::kTurbofan || if (unit->tier() == ExecutionTier::kTurbofan ||
static_cast<int>(results_to_publish.size()) >= queue->ShouldPublish(static_cast<int>(results_to_publish.size()))) {
unpublished_units_limit) {
std::vector<std::unique_ptr<WasmCode>> unpublished_code = std::vector<std::unique_ptr<WasmCode>> unpublished_code =
compile_scope.native_module()->AddCompiledCode( compile_scope.native_module()->AddCompiledCode(
VectorOf(std::move(results_to_publish))); VectorOf(std::move(results_to_publish)));
...@@ -2909,26 +2941,9 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers( ...@@ -2909,26 +2941,9 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers(
} }
} }
std::tuple<CompilationUnitQueues::Queue*, int> CompilationUnitQueues::Queue* CompilationStateImpl::GetQueueForCompileTask(
CompilationStateImpl::GetQueueAndLimitForCompileTask(int task_id) { int task_id) {
CompilationUnitQueues::Queue* queue; return compilation_unit_queues_.GetQueueForTask(task_id);
int num_queues;
std::tie(queue, num_queues) =
compilation_unit_queues_.GetQueueForTaskAndNumQueues(task_id);
// We want background threads to publish regularly (to avoid contention when
// they are all publishing at the end). On the other side, each publishing has
// some overhead (part of it for synchronizing between threads), so it should
// not happen *too* often.
// Thus aim for 4-8 publishes per thread, but distribute it such that
// publishing is likely to happen at different times.
int units_per_thread = static_cast<int>(
native_module_->module()->num_declared_functions / num_queues);
int min = units_per_thread / 8;
// Return something between {min} and {2*min}, but not smaller than {10}.
int limit = std::max(10, min + (min * task_id / num_queues));
return std::make_tuple(queue, limit);
} }
base::Optional<WasmCompilationUnit> base::Optional<WasmCompilationUnit>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment