Commit 15b1ce39 authored by Leszek Swirski's avatar Leszek Swirski Committed by V8 LUCI CQ

[compiler-dispatcher] Port to Jobs API

Port the CompilerDispatcher to use the Jobs API, instead of its own
hand-rolled worker management.

This required some re-thinking of how testing is handled, since the
tests want to be able to

  a) Defer calls to PostTask/Job, to actuall post the jobs later. This
     was easy enough with PostTask, since we could simply store the task
     in a list and no-op, but PostJob has to return a JobHandle. The
     tests now have a DelayedJobHandleWrapper, which defers all method
     calls on itself, and because of all the unique_ptrs, there's also
     now a SharedJobHandleWrapper.

  b) Wait until tasks/jobs complete. Returning from a Task meant that
     the task had completed, but this isn't necessarily the case with
     JobTasks; e.g. a job might be asked to yield. This patch hacks
     around this by Posting and Joining a non-owning copy of the
     requested JobTask, and then re-posting it once Join returns.

Change-Id: If867b4122af52758ffabcfb78a6701f0f95d896d
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2563664
Auto-Submit: Leszek Swirski <leszeks@chromium.org>
Reviewed-by: 's avatarVictor Gomes <victorgomes@chromium.org>
Commit-Queue: Leszek Swirski <leszeks@chromium.org>
Cr-Commit-Position: refs/heads/main@{#77618}
parent 7cd43456
......@@ -1894,7 +1894,7 @@ bool Compiler::Compile(Isolate* isolate, Handle<SharedFunctionInfo> shared_info,
// Check if the compiler dispatcher has shared_info enqueued for compile.
LazyCompileDispatcher* dispatcher = isolate->lazy_compile_dispatcher();
if (dispatcher->IsEnqueued(shared_info)) {
if (dispatcher && dispatcher->IsEnqueued(shared_info)) {
if (!dispatcher->FinishNow(shared_info)) {
return FailWithPendingException(isolate, script, &parse_info, flag);
}
......
......@@ -4,7 +4,11 @@
#include "src/compiler-dispatcher/lazy-compile-dispatcher.h"
#include <atomic>
#include "include/v8-platform.h"
#include "src/ast/ast.h"
#include "src/base/platform/mutex.h"
#include "src/base/platform/time.h"
#include "src/codegen/compiler.h"
#include "src/flags/flags.h"
......@@ -21,6 +25,24 @@
namespace v8 {
namespace internal {
class LazyCompileDispatcher::JobTask : public v8::JobTask {
public:
explicit JobTask(LazyCompileDispatcher* lazy_compile_dispatcher)
: lazy_compile_dispatcher_(lazy_compile_dispatcher) {}
void Run(JobDelegate* delegate) final {
lazy_compile_dispatcher_->DoBackgroundWork(delegate);
}
size_t GetMaxConcurrency(size_t worker_count) const final {
return lazy_compile_dispatcher_->num_jobs_for_background_.load(
std::memory_order_relaxed);
}
private:
LazyCompileDispatcher* lazy_compile_dispatcher_;
};
LazyCompileDispatcher::Job::Job(BackgroundCompileTask* task_arg)
: task(task_arg), has_run(false), aborted(false) {}
......@@ -39,22 +61,21 @@ LazyCompileDispatcher::LazyCompileDispatcher(Isolate* isolate,
platform_(platform),
max_stack_size_(max_stack_size),
trace_compiler_dispatcher_(FLAG_trace_compiler_dispatcher),
task_manager_(new CancelableTaskManager()),
idle_task_manager_(new CancelableTaskManager()),
next_job_id_(0),
shared_to_unoptimized_job_id_(isolate->heap()),
idle_task_scheduled_(false),
num_worker_tasks_(0),
num_jobs_for_background_(0),
main_thread_blocking_on_job_(nullptr),
block_for_testing_(false),
semaphore_for_testing_(0) {
if (trace_compiler_dispatcher_ && !IsEnabled()) {
PrintF("LazyCompileDispatcher: dispatcher is disabled\n");
}
job_handle_ = platform_->PostJob(TaskPriority::kUserVisible,
std::make_unique<JobTask>(this));
}
LazyCompileDispatcher::~LazyCompileDispatcher() {
// AbortAll must be called before LazyCompileDispatcher is destroyed.
CHECK(task_manager_->canceled());
CHECK(!job_handle_->IsValid());
}
base::Optional<LazyCompileDispatcher::JobId> LazyCompileDispatcher::Enqueue(
......@@ -64,8 +85,6 @@ base::Optional<LazyCompileDispatcher::JobId> LazyCompileDispatcher::Enqueue(
"V8.LazyCompilerDispatcherEnqueue");
RCS_SCOPE(isolate_, RuntimeCallCounterId::kCompileEnqueueOnDispatcher);
if (!IsEnabled()) return base::nullopt;
std::unique_ptr<Job> job = std::make_unique<Job>(new BackgroundCompileTask(
isolate_, outer_parse_info, function_name, function_literal,
worker_thread_runtime_call_stats_, background_compile_timer_,
......@@ -83,15 +102,13 @@ base::Optional<LazyCompileDispatcher::JobId> LazyCompileDispatcher::Enqueue(
{
base::MutexGuard lock(&mutex_);
pending_background_jobs_.insert(it->second.get());
num_jobs_for_background_ += 1;
VerifyBackgroundTaskCount(lock);
}
ScheduleMoreWorkerTasksIfNeeded();
job_handle_->NotifyConcurrencyIncrease();
return base::make_optional(id);
}
bool LazyCompileDispatcher::IsEnabled() const {
return FLAG_lazy_compile_dispatcher;
}
bool LazyCompileDispatcher::IsEnqueued(
Handle<SharedFunctionInfo> function) const {
if (jobs_.empty()) return false;
......@@ -139,7 +156,8 @@ void LazyCompileDispatcher::WaitForJobIfRunningOnBackground(Job* job) {
base::MutexGuard lock(&mutex_);
if (running_background_jobs_.find(job) == running_background_jobs_.end()) {
pending_background_jobs_.erase(job);
num_jobs_for_background_ -= pending_background_jobs_.erase(job);
VerifyBackgroundTaskCount(lock);
return;
}
DCHECK_NULL(main_thread_blocking_on_job_);
......@@ -189,7 +207,8 @@ void LazyCompileDispatcher::AbortJob(JobId job_id) {
Job* job = job_it->second.get();
base::LockGuard<base::Mutex> lock(&mutex_);
pending_background_jobs_.erase(job);
num_jobs_for_background_ -= pending_background_jobs_.erase(job);
VerifyBackgroundTaskCount(lock);
if (running_background_jobs_.find(job) == running_background_jobs_.end()) {
RemoveJob(job_it);
} else {
......@@ -200,23 +219,18 @@ void LazyCompileDispatcher::AbortJob(JobId job_id) {
}
void LazyCompileDispatcher::AbortAll() {
task_manager_->TryAbortAll();
idle_task_manager_->TryAbortAll();
job_handle_->Cancel();
for (auto& it : jobs_) {
WaitForJobIfRunningOnBackground(it.second.get());
if (trace_compiler_dispatcher_) {
PrintF("LazyCompileDispatcher: aborted job %zu\n", it.first);
}
}
jobs_.clear();
shared_to_unoptimized_job_id_.Clear();
{
base::MutexGuard lock(&mutex_);
DCHECK(pending_background_jobs_.empty());
DCHECK(running_background_jobs_.empty());
pending_background_jobs_.clear();
}
task_manager_->CancelAndWait();
jobs_.clear();
shared_to_unoptimized_job_id_.Clear();
idle_task_manager_->CancelAndWait();
}
LazyCompileDispatcher::JobMap::const_iterator LazyCompileDispatcher::GetJobFor(
......@@ -235,30 +249,17 @@ void LazyCompileDispatcher::ScheduleIdleTaskFromAnyThread(
if (idle_task_scheduled_) return;
idle_task_scheduled_ = true;
// TODO(leszeks): Using a full task manager for a single cancellable task is
// overkill, we could probably do the cancelling ourselves.
taskrunner_->PostIdleTask(MakeCancelableIdleTask(
task_manager_.get(),
idle_task_manager_.get(),
[this](double deadline_in_seconds) { DoIdleWork(deadline_in_seconds); }));
}
void LazyCompileDispatcher::ScheduleMoreWorkerTasksIfNeeded() {
void LazyCompileDispatcher::DoBackgroundWork(JobDelegate* delegate) {
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("v8.compile"),
"V8.LazyCompilerDispatcherScheduleMoreWorkerTasksIfNeeded");
{
base::MutexGuard lock(&mutex_);
if (pending_background_jobs_.empty()) return;
if (platform_->NumberOfWorkerThreads() <= num_worker_tasks_) {
return;
}
++num_worker_tasks_;
}
platform_->CallOnWorkerThread(
MakeCancelableTask(task_manager_.get(), [this] { DoBackgroundWork(); }));
}
void LazyCompileDispatcher::DoBackgroundWork() {
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("v8.compile"),
"V8.LazyCompilerDispatcherDoBackgroundWork");
for (;;) {
"V8.LazyCompileDispatcherDoBackgroundWork");
while (!delegate->ShouldYield()) {
Job* job = nullptr;
{
base::MutexGuard lock(&mutex_);
......@@ -267,6 +268,7 @@ void LazyCompileDispatcher::DoBackgroundWork() {
job = *it;
pending_background_jobs_.erase(it);
running_background_jobs_.insert(job);
VerifyBackgroundTaskCount(lock);
}
}
if (job == nullptr) break;
......@@ -284,7 +286,8 @@ void LazyCompileDispatcher::DoBackgroundWork() {
{
base::MutexGuard lock(&mutex_);
running_background_jobs_.erase(job);
num_jobs_for_background_ -= running_background_jobs_.erase(job);
VerifyBackgroundTaskCount(lock);
job->has_run = true;
if (job->IsReadyToFinalize(lock)) {
......@@ -300,10 +303,6 @@ void LazyCompileDispatcher::DoBackgroundWork() {
}
}
{
base::MutexGuard lock(&mutex_);
--num_worker_tasks_;
}
// Don't touch |this| anymore after this point, as it might have been
// deleted.
}
......@@ -383,5 +382,12 @@ LazyCompileDispatcher::JobMap::const_iterator LazyCompileDispatcher::RemoveJob(
return jobs_.erase(it);
}
#ifdef DEBUG
void LazyCompileDispatcher::VerifyBackgroundTaskCount(const base::MutexGuard&) {
CHECK_EQ(num_jobs_for_background_.load(),
running_background_jobs_.size() + pending_background_jobs_.size());
}
#endif
} // namespace internal
} // namespace v8
......@@ -81,9 +81,6 @@ class V8_EXPORT_PRIVATE LazyCompileDispatcher {
LazyCompileDispatcher& operator=(const LazyCompileDispatcher&) = delete;
~LazyCompileDispatcher();
// Returns true if the compiler dispatcher is enabled.
bool IsEnabled() const;
base::Optional<JobId> Enqueue(const ParseInfo* outer_parse_info,
const AstRawString* function_name,
const FunctionLiteral* function_literal);
......@@ -117,6 +114,9 @@ class V8_EXPORT_PRIVATE LazyCompileDispatcher {
FRIEND_TEST(LazyCompilerDispatcherTest, AsyncAbortAllRunningWorkerTask);
FRIEND_TEST(LazyCompilerDispatcherTest, CompileMultipleOnBackgroundThread);
// JobTask for PostJob API.
class JobTask;
struct Job {
explicit Job(BackgroundCompileTask* task_arg);
~Job();
......@@ -141,15 +141,20 @@ class V8_EXPORT_PRIVATE LazyCompileDispatcher {
void WaitForJobIfRunningOnBackground(Job* job);
JobMap::const_iterator GetJobFor(Handle<SharedFunctionInfo> shared) const;
void ScheduleMoreWorkerTasksIfNeeded();
void ScheduleIdleTaskFromAnyThread(const base::MutexGuard&);
void DoBackgroundWork();
void DoBackgroundWork(JobDelegate* delegate);
void DoIdleWork(double deadline_in_seconds);
// Returns iterator to the inserted job.
JobMap::const_iterator InsertJob(std::unique_ptr<Job> job);
// Returns iterator following the removed job.
JobMap::const_iterator RemoveJob(JobMap::const_iterator job);
#ifdef DEBUG
void VerifyBackgroundTaskCount(const base::MutexGuard&);
#else
void VerifyBackgroundTaskCount(const base::MutexGuard&) {}
#endif
Isolate* isolate_;
WorkerThreadRuntimeCallStats* worker_thread_runtime_call_stats_;
TimedHistogram* background_compile_timer_;
......@@ -157,10 +162,12 @@ class V8_EXPORT_PRIVATE LazyCompileDispatcher {
Platform* platform_;
size_t max_stack_size_;
std::unique_ptr<JobHandle> job_handle_;
// Copy of FLAG_trace_compiler_dispatcher to allow for access from any thread.
bool trace_compiler_dispatcher_;
std::unique_ptr<CancelableTaskManager> task_manager_;
std::unique_ptr<CancelableTaskManager> idle_task_manager_;
// Id for next job to be added
JobId next_job_id_;
......@@ -179,15 +186,15 @@ class V8_EXPORT_PRIVATE LazyCompileDispatcher {
// True if an idle task is scheduled to be run.
bool idle_task_scheduled_;
// Number of scheduled or running WorkerTask objects.
int num_worker_tasks_;
// The set of jobs that can be run on a background thread.
std::unordered_set<Job*> pending_background_jobs_;
// The set of jobs currently being run on background threads.
std::unordered_set<Job*> running_background_jobs_;
// The total number of jobs, pending and running.
std::atomic<size_t> num_jobs_for_background_;
// If not nullptr, then the main thread waits for the task processing
// this job, and blocks on the ConditionVariable main_thread_blocking_signal_.
Job* main_thread_blocking_on_job_;
......
......@@ -3201,9 +3201,10 @@ void Isolate::Deinit() {
delete heap_profiler_;
heap_profiler_ = nullptr;
compiler_dispatcher_->AbortAll();
delete compiler_dispatcher_;
compiler_dispatcher_ = nullptr;
if (lazy_compile_dispatcher_) {
lazy_compile_dispatcher_->AbortAll();
lazy_compile_dispatcher_.reset();
}
delete baseline_batch_compiler_;
baseline_batch_compiler_ = nullptr;
......@@ -3667,8 +3668,10 @@ bool Isolate::Init(SnapshotData* startup_snapshot_data,
interpreter_ = new interpreter::Interpreter(this);
bigint_processor_ = bigint::Processor::New(new BigIntPlatform(this));
compiler_dispatcher_ = new LazyCompileDispatcher(
this, V8::GetCurrentPlatform(), FLAG_stack_size);
if (FLAG_lazy_compile_dispatcher) {
lazy_compile_dispatcher_ = std::make_unique<LazyCompileDispatcher>(
this, V8::GetCurrentPlatform(), FLAG_stack_size);
}
baseline_batch_compiler_ = new baseline::BaselineBatchCompiler(this);
// Enable logging before setting up the heap
......
......@@ -1676,7 +1676,7 @@ class V8_EXPORT_PRIVATE Isolate final : private HiddenFactory {
AccountingAllocator* allocator() { return allocator_; }
LazyCompileDispatcher* lazy_compile_dispatcher() const {
return compiler_dispatcher_;
return lazy_compile_dispatcher_.get();
}
baseline::BaselineBatchCompiler* baseline_batch_compiler() const {
......@@ -2161,7 +2161,7 @@ class V8_EXPORT_PRIVATE Isolate final : private HiddenFactory {
// through all compilations (and thus all JSHeapBroker instances).
Zone* compiler_zone_ = nullptr;
LazyCompileDispatcher* compiler_dispatcher_ = nullptr;
std::unique_ptr<LazyCompileDispatcher> lazy_compile_dispatcher_;
baseline::BaselineBatchCompiler* baseline_batch_compiler_ = nullptr;
using InterruptEntry = std::pair<InterruptCallback, void*>;
......
......@@ -170,7 +170,7 @@ UnoptimizedCompileState::UnoptimizedCompileState(Isolate* isolate)
ast_string_constants_(isolate->ast_string_constants()),
logger_(isolate->logger()),
parallel_tasks_(
isolate->lazy_compile_dispatcher()->IsEnabled()
isolate->lazy_compile_dispatcher()
? new ParallelTasks(isolate->lazy_compile_dispatcher())
: nullptr) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment