Commit 0af542c8 authored by Clemens Backes's avatar Clemens Backes Committed by Commit Bot

[wasm] Grow number of compilation queues dynamically

Instead of querying the platform for the number of available threads,
and allocating exactly N+1 queues, do grow the number of queues
dynamically. This allows for more than N+1 concurrent threads,
which then allows us to contribute to compilation instead of waiting
doing nothing. This will be added in a follow-up CL.

Special care is being taken to not synchronize too much between threads.
We take a shared mutex whenever stealing tasks, but not on the default
path where we pick a unit from the task's own queue.

R=thibaudm@chromium.org
CC=​etiennep@chromium.org

Bug: v8:11005
Change-Id: I1f67f15fb22b95ef246c37eb80c03132d8a1d149
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_rel_ng
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_isolates_rel_ng
Cq-Include-Trybots: luci.v8.try:v8_linux_gc_stress_dbg_ng
Cq-Include-Trybots: luci.v8.try:v8_mac64_gc_stress_dbg_ng
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2467844
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Reviewed-by: 's avatarThibaud Michaud <thibaudm@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70508}
parent 41b5c8d0
......@@ -190,33 +190,51 @@ enum CompileBaselineOnly : bool {
// runs empty.
class CompilationUnitQueues {
public:
explicit CompilationUnitQueues(int max_tasks, int num_declared_functions)
: queues_(max_tasks), top_tier_priority_units_queues_(max_tasks) {
DCHECK_LT(0, max_tasks);
for (int task_id = 0; task_id < max_tasks; ++task_id) {
queues_[task_id].next_steal_task_id = next_task_id(task_id);
}
// Public API for QueueImpl.
struct Queue {};
explicit CompilationUnitQueues(int num_declared_functions) {
// Add one first queue, to add units to.
queues_.emplace_back(std::make_unique<QueueImpl>(0));
for (auto& atomic_counter : num_units_) {
std::atomic_init(&atomic_counter, size_t{0});
}
treated_ = std::make_unique<std::atomic<bool>[]>(num_declared_functions);
top_tier_compiled_ =
std::make_unique<std::atomic<bool>[]>(num_declared_functions);
for (int i = 0; i < num_declared_functions; i++) {
std::atomic_init(&treated_.get()[i], false);
std::atomic_init(&top_tier_compiled_.get()[i], false);
}
}
base::Optional<WasmCompilationUnit> GetNextUnit(
int task_id, CompileBaselineOnly baseline_only) {
DCHECK_LE(0, task_id);
DCHECK_GT(queues_.size(), task_id);
std::tuple<Queue*, int> GetQueueForTaskAndNumQueues(int task_id) {
size_t required_queues = static_cast<size_t>(task_id) + 1;
{
base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
if (V8_LIKELY(queues_.size() >= required_queues)) {
return std::make_tuple(queues_[task_id].get(),
static_cast<int>(queues_.size()));
}
}
// Otherwise increase the number of queues.
base::SharedMutexGuard<base::kExclusive> queues_guard(&queues_mutex_);
while (queues_.size() < required_queues) {
int steal_from = static_cast<int>(queues_.size() + 1);
queues_.emplace_back(std::make_unique<QueueImpl>(steal_from));
}
return std::make_tuple(queues_[task_id].get(),
static_cast<int>(queues_.size()));
}
base::Optional<WasmCompilationUnit> GetNextUnit(
Queue* queue, CompileBaselineOnly baseline_only) {
// As long as any lower-tier units are outstanding we need to steal them
// before executing own higher-tier units.
int max_tier = baseline_only ? kBaseline : kTopTier;
for (int tier = GetLowestTierWithUnits(); tier <= max_tier; ++tier) {
if (auto unit = GetNextUnitOfTier(task_id, tier)) {
if (auto unit = GetNextUnitOfTier(queue, tier)) {
size_t old_units_count =
num_units_[tier].fetch_sub(1, std::memory_order_relaxed);
DCHECK_LE(1, old_units_count);
......@@ -233,13 +251,18 @@ class CompilationUnitQueues {
DCHECK_LT(0, baseline_units.size() + top_tier_units.size());
// Add to the individual queues in a round-robin fashion. No special care is
// taken to balance them; they will be balanced by work stealing.
int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
while (!next_queue_to_add.compare_exchange_weak(
queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) {
// Retry with updated {queue_to_add}.
QueueImpl* queue;
{
int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
while (!next_queue_to_add.compare_exchange_weak(
queue_to_add, next_task_id(queue_to_add, queues_.size()),
std::memory_order_relaxed)) {
// Retry with updated {queue_to_add}.
}
queue = queues_[queue_to_add].get();
}
Queue* queue = &queues_[queue_to_add];
base::MutexGuard guard(&queue->mutex);
base::Optional<base::MutexGuard> big_units_guard;
for (auto pair : {std::make_pair(int{kBaseline}, baseline_units),
......@@ -265,22 +288,24 @@ class CompilationUnitQueues {
}
void AddTopTierPriorityUnit(WasmCompilationUnit unit, size_t priority) {
base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
// Add to the individual queues in a round-robin fashion. No special care is
// taken to balance them; they will be balanced by work stealing. We use
// the same counter for this reason.
int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
while (!next_queue_to_add.compare_exchange_weak(
queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) {
queue_to_add, next_task_id(queue_to_add, queues_.size()),
std::memory_order_relaxed)) {
// Retry with updated {queue_to_add}.
}
TopTierPriorityUnitsQueue* queue =
&top_tier_priority_units_queues_[queue_to_add];
base::MutexGuard guard(&queue->mutex);
{
auto* queue = queues_[queue_to_add].get();
base::MutexGuard guard(&queue->mutex);
queue->top_tier_priority_units.emplace(priority, unit);
}
num_priority_units_.fetch_add(1, std::memory_order_relaxed);
num_units_[kTopTier].fetch_add(1, std::memory_order_relaxed);
queue->units.emplace(priority, unit);
}
// Get the current total number of units in all queues. This is only a
......@@ -304,15 +329,6 @@ class CompilationUnitQueues {
// order of their function body size.
static constexpr size_t kBigUnitsLimit = 4096;
struct Queue {
base::Mutex mutex;
// Protected by {mutex}:
std::vector<WasmCompilationUnit> units[kNumTiers];
int next_steal_task_id;
// End of fields protected by {mutex}.
};
struct BigUnit {
BigUnit(size_t func_size, WasmCompilationUnit unit)
: func_size{func_size}, unit(unit) {}
......@@ -351,28 +367,32 @@ class CompilationUnitQueues {
std::priority_queue<BigUnit> units[kNumTiers];
};
struct TopTierPriorityUnitsQueue {
struct QueueImpl : public Queue {
explicit QueueImpl(int next_steal_task_id)
: next_steal_task_id(next_steal_task_id) {}
base::Mutex mutex;
// Protected by {mutex}:
std::priority_queue<TopTierPriorityUnit> units;
// All fields are protected by {mutex}.
std::vector<WasmCompilationUnit> units[kNumTiers];
std::priority_queue<TopTierPriorityUnit> top_tier_priority_units;
int next_steal_task_id;
// End of fields protected by {mutex}.
};
std::vector<Queue> queues_;
BigUnitsQueue big_units_queue_;
// {queues_mutex_} protectes {queues_};
base::SharedMutex queues_mutex_;
std::vector<std::unique_ptr<QueueImpl>> queues_;
std::vector<TopTierPriorityUnitsQueue> top_tier_priority_units_queues_;
BigUnitsQueue big_units_queue_;
std::atomic<size_t> num_units_[kNumTiers];
std::atomic<size_t> num_priority_units_{0};
std::unique_ptr<std::atomic<bool>[]> treated_;
std::unique_ptr<std::atomic<bool>[]> top_tier_compiled_;
std::atomic<int> next_queue_to_add{0};
int next_task_id(int task_id) const {
int next_task_id(int task_id, size_t num_queues) const {
int next = task_id + 1;
return next == static_cast<int>(queues_.size()) ? 0 : next;
return next == static_cast<int>(num_queues) ? 0 : next;
}
int GetLowestTierWithUnits() const {
......@@ -382,13 +402,13 @@ class CompilationUnitQueues {
return kNumTiers;
}
base::Optional<WasmCompilationUnit> GetNextUnitOfTier(int task_id, int tier) {
Queue* queue = &queues_[task_id];
base::Optional<WasmCompilationUnit> GetNextUnitOfTier(Queue* public_queue,
int tier) {
QueueImpl* queue = static_cast<QueueImpl*>(public_queue);
// First check whether there is a priority unit. Execute that
// first.
// First check whether there is a priority unit. Execute that first.
if (tier == kTopTier) {
if (auto unit = GetTopTierPriorityUnit(task_id)) {
if (auto unit = GetTopTierPriorityUnit(queue)) {
return unit;
}
}
......@@ -411,12 +431,16 @@ class CompilationUnitQueues {
// Try to steal from all other queues. If this succeeds, return one of the
// stolen units.
size_t steal_trials = queues_.size();
for (; steal_trials > 0;
--steal_trials, steal_task_id = next_task_id(steal_task_id)) {
if (steal_task_id == task_id) continue;
if (auto unit = StealUnitsAndGetFirst(task_id, steal_task_id, tier)) {
return unit;
{
base::SharedMutexGuard<base::kShared> guard(&queues_mutex_);
for (size_t steal_trials = 0; steal_trials < queues_.size();
++steal_trials, ++steal_task_id) {
if (steal_task_id >= static_cast<int>(queues_.size())) {
steal_task_id = 0;
}
if (auto unit = StealUnitsAndGetFirst(queue, steal_task_id, tier)) {
return unit;
}
}
}
......@@ -425,7 +449,7 @@ class CompilationUnitQueues {
}
base::Optional<WasmCompilationUnit> GetBigUnitOfTier(int tier) {
// Fast-path without locking.
// Fast path without locking.
if (!big_units_queue_.has_units[tier].load(std::memory_order_relaxed)) {
return {};
}
......@@ -439,25 +463,22 @@ class CompilationUnitQueues {
return unit;
}
base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(int task_id) {
// Fast-path without locking.
base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(QueueImpl* queue) {
// Fast path without locking.
if (num_priority_units_.load(std::memory_order_relaxed) == 0) {
return {};
}
TopTierPriorityUnitsQueue* queue =
&top_tier_priority_units_queues_[task_id];
int steal_task_id;
{
base::MutexGuard mutex_guard(&queue->mutex);
while (!queue->units.empty()) {
auto unit = queue->units.top().unit;
queue->units.pop();
while (!queue->top_tier_priority_units.empty()) {
auto unit = queue->top_tier_priority_units.top().unit;
queue->top_tier_priority_units.pop();
num_priority_units_.fetch_sub(1, std::memory_order_relaxed);
if (!treated_[unit.func_index()].exchange(true,
std::memory_order_relaxed)) {
if (!top_tier_compiled_[unit.func_index()].exchange(
true, std::memory_order_relaxed)) {
return unit;
}
num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed);
......@@ -467,28 +488,34 @@ class CompilationUnitQueues {
// Try to steal from all other queues. If this succeeds, return one of the
// stolen units.
size_t steal_trials = queues_.size();
for (; steal_trials > 0;
--steal_trials, steal_task_id = next_task_id(steal_task_id)) {
if (steal_task_id == task_id) continue;
if (auto unit = StealTopTierPriorityUnit(task_id, steal_task_id)) {
return unit;
{
base::SharedMutexGuard<base::kShared> guard(&queues_mutex_);
for (size_t steal_trials = 0; steal_trials < queues_.size();
++steal_trials, ++steal_task_id) {
if (steal_task_id >= static_cast<int>(queues_.size())) {
steal_task_id = 0;
}
if (auto unit = StealTopTierPriorityUnit(queue, steal_task_id)) {
return unit;
}
}
}
return {};
}
// Steal units of {wanted_tier} from {steal_from_task_id} to {task_id}. Return
// Steal units of {wanted_tier} from {steal_from_task_id} to {queue}. Return
// first stolen unit (rest put in queue of {task_id}), or {nullopt} if
// {steal_from_task_id} had no units of {wanted_tier}.
// Hold a shared lock on {queues_mutex_} when calling this method.
base::Optional<WasmCompilationUnit> StealUnitsAndGetFirst(
int task_id, int steal_from_task_id, int wanted_tier) {
DCHECK_NE(task_id, steal_from_task_id);
QueueImpl* queue, int steal_from_task_id, int wanted_tier) {
auto* steal_queue = queues_[steal_from_task_id].get();
// Cannot steal from own queue.
if (steal_queue == queue) return {};
std::vector<WasmCompilationUnit> stolen;
base::Optional<WasmCompilationUnit> returned_unit;
{
Queue* steal_queue = &queues_[steal_from_task_id];
base::MutexGuard guard(&steal_queue->mutex);
auto* steal_from_vector = &steal_queue->units[wanted_tier];
if (steal_from_vector->empty()) return {};
......@@ -498,44 +525,41 @@ class CompilationUnitQueues {
stolen.assign(steal_begin + 1, steal_from_vector->end());
steal_from_vector->erase(steal_begin, steal_from_vector->end());
}
Queue* queue = &queues_[task_id];
base::MutexGuard guard(&queue->mutex);
auto* target_queue = &queue->units[wanted_tier];
target_queue->insert(target_queue->end(), stolen.begin(), stolen.end());
queue->next_steal_task_id = next_task_id(steal_from_task_id);
queue->next_steal_task_id = steal_from_task_id + 1;
return returned_unit;
}
// Steal one priority unit from {steal_from_task_id} to {task_id}. Return
// stolen unit, or {nullopt} if {steal_from_task_id} had no priority units.
// Hold a shared lock on {queues_mutex_} when calling this method.
base::Optional<WasmCompilationUnit> StealTopTierPriorityUnit(
int task_id, int steal_from_task_id) {
DCHECK_NE(task_id, steal_from_task_id);
QueueImpl* queue, int steal_from_task_id) {
auto* steal_queue = queues_[steal_from_task_id].get();
// Cannot steal from own queue.
if (steal_queue == queue) return {};
base::Optional<WasmCompilationUnit> returned_unit;
{
TopTierPriorityUnitsQueue* steal_queue =
&top_tier_priority_units_queues_[steal_from_task_id];
base::MutexGuard guard(&steal_queue->mutex);
while (true) {
if (steal_queue->units.empty()) return {};
if (steal_queue->top_tier_priority_units.empty()) return {};
auto unit = steal_queue->units.top().unit;
steal_queue->units.pop();
auto unit = steal_queue->top_tier_priority_units.top().unit;
steal_queue->top_tier_priority_units.pop();
num_priority_units_.fetch_sub(1, std::memory_order_relaxed);
if (!treated_[unit.func_index()].exchange(true,
std::memory_order_relaxed)) {
if (!top_tier_compiled_[unit.func_index()].exchange(
true, std::memory_order_relaxed)) {
returned_unit = unit;
break;
}
num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed);
}
}
TopTierPriorityUnitsQueue* queue =
&top_tier_priority_units_queues_[task_id];
base::MutexGuard guard(&queue->mutex);
queue->next_steal_task_id = next_task_id(steal_from_task_id);
queue->next_steal_task_id = steal_from_task_id + 1;
return returned_unit;
}
};
......@@ -585,8 +609,12 @@ class CompilationStateImpl {
js_to_wasm_wrapper_units);
void AddTopTierCompilationUnit(WasmCompilationUnit);
void AddTopTierPriorityCompilationUnit(WasmCompilationUnit, size_t);
std::tuple<CompilationUnitQueues::Queue*, int> GetQueueAndLimitForCompileTask(
int task_id);
base::Optional<WasmCompilationUnit> GetNextCompilationUnit(
int task_id, CompileBaselineOnly baseline_only);
CompilationUnitQueues::Queue*, CompileBaselineOnly);
std::shared_ptr<JSToWasmWrapperCompilationUnit>
GetNextJSToWasmWrapperCompilationUnit();
......@@ -596,8 +624,6 @@ class CompilationStateImpl {
void OnFinishedUnits(Vector<WasmCode*>);
void OnFinishedJSToWasmWrapperUnits(int num);
int GetFreeCompileTaskId();
int GetUnpublishedUnitsLimits(int task_id);
void OnCompilationStopped(const WasmFeatures& detected);
void PublishDetectedFeatures(Isolate*);
// Ensure that a compilation job is running, and increase its concurrency if
......@@ -661,8 +687,6 @@ class CompilationStateImpl {
// using relaxed semantics.
std::atomic<bool> compile_failed_{false};
const int max_compile_concurrency_ = 0;
CompilationUnitQueues compilation_unit_queues_;
// Index of the next wrapper to compile in {js_to_wasm_wrapper_units_}.
......@@ -1218,11 +1242,8 @@ CompilationExecutionResult ExecuteCompilationUnits(
std::shared_ptr<WireBytesStorage> wire_bytes;
std::shared_ptr<const WasmModule> module;
WasmEngine* wasm_engine;
// The Jobs API guarantees that {GetTaskId} is less than the number of
// workers, and that the number of workers is less than or equal to the max
// compile concurrency, which makes the task_id safe to use as an index into
// the worker queues.
int task_id = delegate ? delegate->GetTaskId() : 0;
CompilationUnitQueues::Queue* queue;
int unpublished_units_limit;
base::Optional<WasmCompilationUnit> unit;
......@@ -1238,9 +1259,9 @@ CompilationExecutionResult ExecuteCompilationUnits(
wire_bytes = compilation_state->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module();
wasm_engine = compile_scope.native_module()->engine();
unpublished_units_limit =
compilation_state->GetUnpublishedUnitsLimits(task_id);
unit = compilation_state->GetNextCompilationUnit(task_id, baseline_only);
std::tie(queue, unpublished_units_limit) =
compilation_state->GetQueueAndLimitForCompileTask(task_id);
unit = compilation_state->GetNextCompilationUnit(queue, baseline_only);
if (!unit) return kNoMoreUnits;
}
TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
......@@ -1307,7 +1328,7 @@ CompilationExecutionResult ExecuteCompilationUnits(
// Yield or get next unit.
if (yield ||
!(unit = compile_scope.compilation_state()->GetNextCompilationUnit(
task_id, baseline_only))) {
queue, baseline_only))) {
publish_results(&compile_scope);
compile_scope.compilation_state()->OnCompilationStopped(
detected_features);
......@@ -1590,11 +1611,8 @@ void CompileNativeModule(Isolate* isolate,
class BackgroundCompileJob : public JobTask {
public:
explicit BackgroundCompileJob(std::shared_ptr<BackgroundCompileToken> token,
std::shared_ptr<Counters> async_counters,
size_t max_concurrency)
: token_(std::move(token)),
async_counters_(std::move(async_counters)),
max_concurrency_(max_concurrency) {}
std::shared_ptr<Counters> async_counters)
: token_(std::move(token)), async_counters_(std::move(async_counters)) {}
void Run(JobDelegate* delegate) override {
ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
......@@ -1606,15 +1624,16 @@ class BackgroundCompileJob : public JobTask {
if (scope.cancelled()) return 0;
// NumOutstandingCompilations() does not reflect the units that running
// workers are processing, thus add the current worker count to that number.
return std::min<size_t>(
max_concurrency_,
size_t flag_limit =
static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
return std::min(
flag_limit,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
}
private:
const std::shared_ptr<BackgroundCompileToken> token_;
const std::shared_ptr<Counters> async_counters_;
const size_t max_concurrency_;
};
} // namespace
......@@ -2722,8 +2741,7 @@ bool AsyncStreamingProcessor::Deserialize(Vector<const uint8_t> module_bytes,
return true;
}
// TODO(wasm): Try to avoid the {NumberOfWorkerThreads} calls, grow queues
// dynamically instead.
// TODO(wasm): Use the jobs API for wrapper compilation, remove this method.
int GetMaxCompileConcurrency() {
int num_worker_threads = V8::GetCurrentPlatform()->NumberOfWorkerThreads();
return std::min(FLAG_wasm_num_compilation_tasks, num_worker_threads);
......@@ -2740,11 +2758,7 @@ CompilationStateImpl::CompilationStateImpl(
? CompileMode::kTiering
: CompileMode::kRegular),
async_counters_(std::move(async_counters)),
max_compile_concurrency_(std::max(GetMaxCompileConcurrency(), 1)),
// Add one to the allowed number of parallel tasks, because the foreground
// task sometimes also contributes.
compilation_unit_queues_(max_compile_concurrency_ + 1,
native_module->num_functions()) {}
compilation_unit_queues_(native_module->num_functions()) {}
void CompilationStateImpl::CancelCompilation() {
background_compile_token_->Cancel();
......@@ -2988,10 +3002,32 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers(
}
}
std::tuple<CompilationUnitQueues::Queue*, int>
CompilationStateImpl::GetQueueAndLimitForCompileTask(int task_id) {
CompilationUnitQueues::Queue* queue;
int num_queues;
std::tie(queue, num_queues) =
compilation_unit_queues_.GetQueueForTaskAndNumQueues(task_id);
// We want background threads to publish regularly (to avoid contention when
// they are all publishing at the end). On the other side, each publishing has
// some overhead (part of it for synchronizing between threads), so it should
// not happen *too* often.
// Thus aim for 4-8 publishes per thread, but distribute it such that
// publishing is likely to happen at different times.
int units_per_thread = static_cast<int>(
native_module_->module()->num_declared_functions / num_queues);
int min = units_per_thread / 8;
// Return something between {min} and {2*min}, but not smaller than {10}.
int limit = std::max(10, min + (min * task_id / num_queues));
return std::make_tuple(queue, limit);
}
base::Optional<WasmCompilationUnit>
CompilationStateImpl::GetNextCompilationUnit(
int task_id, CompileBaselineOnly baseline_only) {
return compilation_unit_queues_.GetNextUnit(task_id, baseline_only);
CompilationUnitQueues::Queue* queue, CompileBaselineOnly baseline_only) {
return compilation_unit_queues_.GetNextUnit(queue, baseline_only);
}
void CompilationStateImpl::OnFinishedUnits(Vector<WasmCode*> code_vector) {
......@@ -3151,21 +3187,6 @@ void CompilationStateImpl::TriggerCallbacks(
}
}
int CompilationStateImpl::GetUnpublishedUnitsLimits(int task_id) {
// We want background threads to publish regularly (to avoid contention when
// they are all publishing at the end). On the other side, each publishing has
// some overhead (part of it for synchronizing between threads), so it should
// not happen *too* often.
// Thus aim for 4-8 publishes per thread, but distribute it such that
// publishing is likely to happen at different times.
int units_per_thread =
static_cast<int>(native_module_->module()->num_declared_functions /
max_compile_concurrency_);
int min = units_per_thread / 8;
// Return something between {min} and {2*min}, but not smaller than {10}.
return std::max(10, min + (min * task_id / max_compile_concurrency_));
}
void CompilationStateImpl::OnCompilationStopped(const WasmFeatures& detected) {
base::MutexGuard guard(&mutex_);
detected_features_.Add(detected);
......@@ -3187,8 +3208,8 @@ void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (failed()) return;
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(
background_compile_token_, async_counters_, max_compile_concurrency_);
std::make_unique<BackgroundCompileJob>(background_compile_token_,
async_counters_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible, std::move(new_compile_job));
......@@ -3249,7 +3270,6 @@ class CompileJSToWasmWrapperJob final : public JobTask {
size_t max_concurrency)
: queue_(queue),
compilation_units_(compilation_units),
max_concurrency_(max_concurrency),
outstanding_units_(queue->size()) {}
void Run(JobDelegate* delegate) override {
......@@ -3265,14 +3285,15 @@ class CompileJSToWasmWrapperJob final : public JobTask {
// {outstanding_units_} includes the units that other workers are currently
// working on, so we can safely ignore the {worker_count} and just return
// the current number of outstanding units.
return std::min(max_concurrency_,
size_t flag_limit =
static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
return std::min(flag_limit,
outstanding_units_.load(std::memory_order_relaxed));
}
private:
JSToWasmWrapperQueue* const queue_;
JSToWasmWrapperUnitMap* const compilation_units_;
const size_t max_concurrency_;
std::atomic<size_t> outstanding_units_;
};
} // namespace
......
......@@ -1468,7 +1468,7 @@ void InstanceBuilder::CompileImportWrappers(
}
CancelableTaskManager task_manager;
// TODO(wasm): Switch this to the Jobs API.
// TODO(wasm): Switch this to the Jobs API, remove {GetMaxCompileConcurrency}.
const int max_background_tasks = GetMaxCompileConcurrency();
for (int i = 0; i < max_background_tasks; ++i) {
auto task = std::make_unique<CompileImportWrapperTask>(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment