Commit 902e181d authored by Clemens Backes's avatar Clemens Backes Committed by Commit Bot

[wasm][serialization] Use Jobs to avoid blocking

We did spawn exactly one task for each of copy&reloc and publishing.
Those tasks did block until work is available. This can block background
threads which could otherwise execute other component's work.
Switching to the Job API allows us to easily avoid that blocking, and
just respawning a task when more work is available.
Is always avoid code duplication for participating in the work in the
main thread. Instead we just {Join()} the existing job, which makes the
current thread participate in work.

For now, both Jobs set a maximum concurrency of one, so the main thread
will only do work if no background thread is currently running. This can
be lifted in a follow-up CL to see the performance impact of both
changes independently.

R=thibaudm@chromium.org

Bug: v8:11164
Change-Id: I032153eb933648a750b113f5d766feb85b87070a
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_rel_ng
Cq-Include-Trybots: luci.v8.try:v8_linux64_tsan_isolates_rel_ng
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2643393Reviewed-by: 's avatarThibaud Michaud <thibaudm@chromium.org>
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#72288}
parent 7ae8c713
......@@ -476,31 +476,26 @@ struct DeserializationUnit {
class DeserializationQueue {
public:
void Add(std::unique_ptr<std::vector<DeserializationUnit>> batch) {
DCHECK_NOT_NULL(batch);
base::MutexGuard guard(&mutex_);
queue_.push(std::move(batch));
cv_.NotifyOne();
}
std::unique_ptr<std::vector<DeserializationUnit>> Pop() {
base::MutexGuard guard(&mutex_);
while (queue_.empty()) {
cv_.Wait(&mutex_);
}
if (queue_.empty()) return nullptr;
auto batch = std::move(queue_.front());
if (batch) queue_.pop();
queue_.pop();
return batch;
}
std::unique_ptr<std::vector<DeserializationUnit>> UnlockedPop() {
DCHECK(!queue_.empty());
auto batch = std::move(queue_.front());
queue_.pop();
return batch;
size_t NumBatches() {
base::MutexGuard guard(&mutex_);
return queue_.size();
}
private:
base::Mutex mutex_;
base::ConditionVariable cv_;
std::queue<std::unique_ptr<std::vector<DeserializationUnit>>> queue_;
};
......@@ -528,57 +523,67 @@ class V8_EXPORT_PRIVATE NativeModuleDeserializer {
#endif
};
class CopyAndRelocTask : public CancelableTask {
class CopyAndRelocTask : public JobTask {
public:
CopyAndRelocTask(NativeModuleDeserializer* deserializer,
DeserializationQueue& from_queue,
DeserializationQueue& to_queue,
CancelableTaskManager* task_manager)
: CancelableTask(task_manager),
deserializer_(deserializer),
DeserializationQueue* from_queue,
DeserializationQueue* to_queue,
std::shared_ptr<JobHandle> publish_handle)
: deserializer_(deserializer),
from_queue_(from_queue),
to_queue_(to_queue) {}
to_queue_(to_queue),
publish_handle_(std::move(publish_handle)) {}
void RunInternal() override {
void Run(JobDelegate* delegate) override {
CODE_SPACE_WRITE_SCOPE
for (;;) {
auto batch = from_queue_.Pop();
if (!batch) break;
for (auto& unit : *batch) {
while (auto batch = from_queue_->Pop()) {
for (const auto& unit : *batch) {
deserializer_->CopyAndRelocate(unit);
}
to_queue_.Add(std::move(batch));
to_queue_->Add(std::move(batch));
publish_handle_->NotifyConcurrencyIncrease();
if (delegate->ShouldYield()) return;
}
to_queue_.Add(nullptr);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
// Run the CopyAndRelocTask in a single thread for now. We can later bump
// this to see if it improves performance.
if (worker_count > 0) return 0;
return std::min(size_t{1}, from_queue_->NumBatches());
}
private:
NativeModuleDeserializer* deserializer_;
DeserializationQueue& from_queue_;
DeserializationQueue& to_queue_;
NativeModuleDeserializer* const deserializer_;
DeserializationQueue* const from_queue_;
DeserializationQueue* const to_queue_;
std::shared_ptr<JobHandle> const publish_handle_;
};
class PublishTask : public CancelableTask {
class PublishTask : public JobTask {
public:
PublishTask(NativeModuleDeserializer* deserializer,
DeserializationQueue& from_queue,
CancelableTaskManager* task_manager)
: CancelableTask(task_manager),
deserializer_(deserializer),
from_queue_(from_queue) {}
DeserializationQueue* from_queue)
: deserializer_(deserializer), from_queue_(from_queue) {}
void RunInternal() override {
void Run(JobDelegate* delegate) override {
WasmCodeRefScope code_scope;
for (;;) {
auto batch = from_queue_.Pop();
if (!batch) break;
while (auto batch = from_queue_->Pop()) {
deserializer_->Publish(std::move(batch));
if (delegate->ShouldYield()) return;
}
}
size_t GetMaxConcurrency(size_t worker_count) const override {
// Publishing is sequential anyway, so never return more than 1. If a
// worker is already running, don't spawn a second one.
if (worker_count > 0) return 0;
return std::min(size_t{1}, from_queue_->NumBatches());
}
private:
NativeModuleDeserializer* deserializer_;
DeserializationQueue& from_queue_;
NativeModuleDeserializer* const deserializer_;
DeserializationQueue* const from_queue_;
};
NativeModuleDeserializer::NativeModuleDeserializer(NativeModule* native_module)
......@@ -596,18 +601,17 @@ bool NativeModuleDeserializer::Read(Reader* reader) {
DeserializationQueue reloc_queue;
DeserializationQueue publish_queue;
CancelableTaskManager cancelable_task_manager;
auto copy_task = std::make_unique<CopyAndRelocTask>(
this, reloc_queue, publish_queue, &cancelable_task_manager);
V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(copy_task));
std::shared_ptr<JobHandle> publish_handle = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible,
std::make_unique<PublishTask>(this, &publish_queue));
auto publish_task = std::make_unique<PublishTask>(this, publish_queue,
&cancelable_task_manager);
V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(publish_task));
std::unique_ptr<JobHandle> copy_and_reloc_handle =
V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible,
std::make_unique<CopyAndRelocTask>(this, &reloc_queue, &publish_queue,
publish_handle));
auto batch = std::make_unique<std::vector<DeserializationUnit>>();
int num_batches = 0;
const byte* batch_start = reader->current_location();
for (uint32_t i = first_wasm_fn; i < total_fns; ++i) {
DeserializationUnit unit = ReadCodeAndAlloc(i, reader);
......@@ -618,49 +622,21 @@ bool NativeModuleDeserializer::Read(Reader* reader) {
constexpr int kMinBatchSizeInBytes = 100000;
if (batch_size_in_bytes >= kMinBatchSizeInBytes) {
reloc_queue.Add(std::move(batch));
num_batches++;
batch = std::make_unique<std::vector<DeserializationUnit>>();
batch_start = reader->current_location();
copy_and_reloc_handle->NotifyConcurrencyIncrease();
}
}
if (!batch->empty()) {
reloc_queue.Add(std::move(batch));
num_batches++;
copy_and_reloc_handle->NotifyConcurrencyIncrease();
}
reloc_queue.Add(nullptr);
// Participate to deserialization in the main thread to ensure progress even
// if background tasks are not scheduled.
int published = 0;
{
CODE_SPACE_WRITE_SCOPE
for (;;) {
auto batch = reloc_queue.Pop();
if (!batch) break;
for (auto& unit : *batch) {
CopyAndRelocate(unit);
}
Publish(std::move(batch));
++published;
}
}
if (published == num_batches) {
// {CopyAndRelocTask} did not take any work from the reloc queue, probably
// because it was not scheduled yet. Ensure that the end marker gets added
// to the queue in this case.
publish_queue.Add(nullptr);
}
cancelable_task_manager.CancelAndWait();
// Wait for all tasks to finish, while participating in their work.
copy_and_reloc_handle->Join();
publish_handle->Join();
// Process the publish queue now in case {PublishTask} was canceled.
for (;;) {
auto batch = publish_queue.UnlockedPop();
if (!batch) break;
Publish(std::move(batch));
}
DCHECK_EQ(total_published_.load(), num_batches);
return reader->current_size() == 0;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment