Commit 79e3fc0d authored by Clemens Backes's avatar Clemens Backes Committed by V8 LUCI CQ

[libplatform] Spawn more tasks from DefaultJobState::Join

If DefaultJobState::Join is called before any worker tasks were spawned
(e.g. right after Platform::CreateJob), it should spawn the required
number of worker tasks (mimicing what Platform::PostJob followed by Join
would do, but with less context switches).
This fixes regressions we got from switching from Platform::PostJob to
Platform::CreateJob.

R=mlippautz@chromium.org
CC=etiennep@chromium.org

Bug: chromium:1348512
Change-Id: Ic7984d12a28fc67f4b2f51ddc2ba5a406e43c127
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3804600Reviewed-by: 's avatarMichael Lippautz <mlippautz@chromium.org>
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/main@{#82178}
parent a2a5d041
......@@ -48,7 +48,7 @@ void DefaultJobState::NotifyConcurrencyIncrease() {
base::MutexGuard guard(&mutex_);
const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
// Consider |pending_tasks_| to avoid posting too many tasks.
if (max_concurrency > (active_workers_ + pending_tasks_)) {
if (max_concurrency > active_workers_ + pending_tasks_) {
num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
pending_tasks_ += num_tasks_to_post;
}
......@@ -93,22 +93,55 @@ void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
}
void DefaultJobState::Join() {
bool can_run = false;
auto WaitForParticipationOpportunity = [this]() -> size_t {
// Subtract one from active_workers_ since the current thread is not
// participating yet.
size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
// Wait until we can participate in the job.
while (active_workers_ > max_concurrency && active_workers_ > 1) {
worker_released_condition_.Wait(&mutex_);
max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
}
DCHECK_LE(0, max_concurrency);
if (max_concurrency != 0) return max_concurrency;
// The job is done (max_concurrency dropped to zero).
DCHECK_EQ(1, active_workers_);
active_workers_ = 0;
is_canceled_.store(true, std::memory_order_relaxed);
return 0;
};
size_t num_tasks_to_post = 0;
{
base::MutexGuard guard(&mutex_);
priority_ = TaskPriority::kUserBlocking;
// Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
// here, but WaitForParticipationOpportunityLockRequired() waits for
// workers to return if necessary so we don't exceed GetMaxConcurrency().
num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
// Reserve a worker for the joining (current) thread.
// GetMaxConcurrency() is ignored here, but if necessary we wait below
// for workers to return so we don't exceed GetMaxConcurrency().
++num_worker_threads_;
++active_workers_;
can_run = WaitForParticipationOpportunityLockRequired();
size_t max_concurrency = WaitForParticipationOpportunity();
if (max_concurrency == 0) return;
// Compute the number of additional worker tasks to spawn.
if (max_concurrency > active_workers_ + pending_tasks_) {
num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
pending_tasks_ += num_tasks_to_post;
}
}
// Spawn more worker tasks if needed.
for (size_t i = 0; i < num_tasks_to_post; ++i) {
CallOnWorkerThread(TaskPriority::kUserBlocking,
std::make_unique<DefaultJobWorker>(shared_from_this(),
job_task_.get()));
}
DefaultJobState::JobDelegate delegate(this, true);
while (can_run) {
while (true) {
// Participate in job execution, as one active worker.
job_task_->Run(&delegate);
base::MutexGuard guard(&mutex_);
can_run = WaitForParticipationOpportunityLockRequired();
if (WaitForParticipationOpportunity() == 0) return;
}
}
......@@ -136,10 +169,7 @@ bool DefaultJobState::CanRunFirstTask() {
base::MutexGuard guard(&mutex_);
--pending_tasks_;
if (is_canceled_.load(std::memory_order_relaxed)) return false;
if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
num_worker_threads_)) {
return false;
}
if (active_workers_ >= CappedMaxConcurrency(active_workers_)) return false;
// Acquire current worker.
++active_workers_;
return true;
......@@ -177,20 +207,6 @@ bool DefaultJobState::DidRunTask() {
return true;
}
bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
while (active_workers_ > max_concurrency && active_workers_ > 1) {
worker_released_condition_.Wait(&mutex_);
max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
}
if (active_workers_ <= max_concurrency) return true;
DCHECK_EQ(1U, active_workers_);
DCHECK_EQ(0U, max_concurrency);
active_workers_ = 0;
is_canceled_.store(true, std::memory_order_relaxed);
return false;
}
size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
return std::min(job_task_->GetMaxConcurrency(worker_count),
num_worker_threads_);
......
......@@ -75,13 +75,6 @@ class V8_PLATFORM_EXPORT DefaultJobState
void UpdatePriority(TaskPriority);
private:
// Called from the joining thread. Waits for the worker count to be below or
// equal to max concurrency (will happen when a worker calls
// DidRunTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// there's no work remaining.
bool WaitForParticipationOpportunityLockRequired();
// Returns GetMaxConcurrency() capped by the number of threads used by this
// job.
size_t CappedMaxConcurrency(size_t worker_count) const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment