// Copyright 2020 the V8 project authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "src/libplatform/default-job.h" #include "src/base/bits.h" #include "src/base/macros.h" namespace v8 { namespace platform { namespace { // Capped to allow assigning task_ids from a bitfield. constexpr size_t kMaxWorkersPerJob = 32; } // namespace DefaultJobState::JobDelegate::~JobDelegate() { static_assert(kInvalidTaskId >= kMaxWorkersPerJob, "kInvalidTaskId must be outside of the range of valid task_ids " "[0, kMaxWorkersPerJob)"); if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_); } uint8_t DefaultJobState::JobDelegate::GetTaskId() { if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId(); return task_id_; } DefaultJobState::DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task, TaskPriority priority, size_t num_worker_threads) : platform_(platform), job_task_(std::move(job_task)), priority_(priority), num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {} DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); } void DefaultJobState::NotifyConcurrencyIncrease() { if (is_canceled_.load(std::memory_order_relaxed)) return; size_t num_tasks_to_post = 0; TaskPriority priority; { base::MutexGuard guard(&mutex_); const size_t max_concurrency = CappedMaxConcurrency(active_workers_); // Consider |pending_tasks_| to avoid posting too many tasks. if (max_concurrency > (active_workers_ + pending_tasks_)) { num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_; pending_tasks_ += num_tasks_to_post; } priority = priority_; } // Post additional worker tasks to reach |max_concurrency|. for (size_t i = 0; i < num_tasks_to_post; ++i) { CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>( shared_from_this(), job_task_.get())); } } uint8_t DefaultJobState::AcquireTaskId() { static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8, "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob."); uint32_t assigned_task_ids = assigned_task_ids_.load(std::memory_order_relaxed); DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1, kMaxWorkersPerJob); uint32_t new_assigned_task_ids = 0; uint8_t task_id = 0; // memory_order_acquire on success, matched with memory_order_release in // ReleaseTaskId() so that operations done by previous threads that had // the same task_id become visible to the current thread. do { // Count trailing one bits. This is the id of the right-most 0-bit in // |assigned_task_ids|. task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids); new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id); } while (!assigned_task_ids_.compare_exchange_weak( assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire, std::memory_order_relaxed)); return task_id; } void DefaultJobState::ReleaseTaskId(uint8_t task_id) { // memory_order_release to match AcquireTaskId(). uint32_t previous_task_ids = assigned_task_ids_.fetch_and( ~(uint32_t(1) << task_id), std::memory_order_release); DCHECK(previous_task_ids & (uint32_t(1) << task_id)); USE(previous_task_ids); } void DefaultJobState::Join() { bool can_run = false; { base::MutexGuard guard(&mutex_); priority_ = TaskPriority::kUserBlocking; // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored // here, but WaitForParticipationOpportunityLockRequired() waits for // workers to return if necessary so we don't exceed GetMaxConcurrency(). num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1; ++active_workers_; can_run = WaitForParticipationOpportunityLockRequired(); } DefaultJobState::JobDelegate delegate(this, true); while (can_run) { job_task_->Run(&delegate); base::MutexGuard guard(&mutex_); can_run = WaitForParticipationOpportunityLockRequired(); } } void DefaultJobState::CancelAndWait() { { base::MutexGuard guard(&mutex_); is_canceled_.store(true, std::memory_order_relaxed); while (active_workers_ > 0) { worker_released_condition_.Wait(&mutex_); } } } void DefaultJobState::CancelAndDetach() { is_canceled_.store(true, std::memory_order_relaxed); } bool DefaultJobState::IsActive() { base::MutexGuard guard(&mutex_); return job_task_->GetMaxConcurrency(active_workers_) != 0 || active_workers_ != 0; } bool DefaultJobState::CanRunFirstTask() { base::MutexGuard guard(&mutex_); --pending_tasks_; if (is_canceled_.load(std::memory_order_relaxed)) return false; if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_), num_worker_threads_)) { return false; } // Acquire current worker. ++active_workers_; return true; } bool DefaultJobState::DidRunTask() { size_t num_tasks_to_post = 0; TaskPriority priority; { base::MutexGuard guard(&mutex_); const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1); if (is_canceled_.load(std::memory_order_relaxed) || active_workers_ > max_concurrency) { // Release current worker and notify. --active_workers_; worker_released_condition_.NotifyOne(); return false; } // Consider |pending_tasks_| to avoid posting too many tasks. if (max_concurrency > active_workers_ + pending_tasks_) { num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_; pending_tasks_ += num_tasks_to_post; } priority = priority_; } // Post additional worker tasks to reach |max_concurrency| in the case that // max concurrency increased. This is not strictly necessary, since // NotifyConcurrencyIncrease() should eventually be invoked. However, some // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease() // late. Posting here allows us to spawn new workers sooner. for (size_t i = 0; i < num_tasks_to_post; ++i) { CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>( shared_from_this(), job_task_.get())); } return true; } bool DefaultJobState::WaitForParticipationOpportunityLockRequired() { size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1); while (active_workers_ > max_concurrency && active_workers_ > 1) { worker_released_condition_.Wait(&mutex_); max_concurrency = CappedMaxConcurrency(active_workers_ - 1); } if (active_workers_ <= max_concurrency) return true; DCHECK_EQ(1U, active_workers_); DCHECK_EQ(0U, max_concurrency); active_workers_ = 0; is_canceled_.store(true, std::memory_order_relaxed); return false; } size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const { return std::min(job_task_->GetMaxConcurrency(worker_count), num_worker_threads_); } void DefaultJobState::CallOnWorkerThread(TaskPriority priority, std::unique_ptr<Task> task) { switch (priority) { case TaskPriority::kBestEffort: return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task)); case TaskPriority::kUserVisible: return platform_->CallOnWorkerThread(std::move(task)); case TaskPriority::kUserBlocking: return platform_->CallBlockingTaskOnWorkerThread(std::move(task)); } } void DefaultJobState::UpdatePriority(TaskPriority priority) { base::MutexGuard guard(&mutex_); priority_ = priority; } DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state) : state_(std::move(state)) { state_->NotifyConcurrencyIncrease(); } DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); } void DefaultJobHandle::Join() { state_->Join(); state_ = nullptr; } void DefaultJobHandle::Cancel() { state_->CancelAndWait(); state_ = nullptr; } void DefaultJobHandle::CancelAndDetach() { state_->CancelAndDetach(); state_ = nullptr; } bool DefaultJobHandle::IsActive() { return state_->IsActive(); } void DefaultJobHandle::UpdatePriority(TaskPriority priority) { state_->UpdatePriority(priority); } } // namespace platform } // namespace v8