Commit b02bb408 authored by Michael Lippautz's avatar Michael Lippautz Committed by Commit Bot

[heap] Add ItemParallelJob

Adds a generic job that is based on items and tasks.

Bug: chromium:651354
Change-Id: I378e04741c5761ea6c4a74816b9af8ea22867f53
Reviewed-on: https://chromium-review.googlesource.com/506075
Commit-Queue: Michael Lippautz <mlippautz@chromium.org>
Reviewed-by: 's avatarHannes Payer <hpayer@chromium.org>
Reviewed-by: 's avatarUlan Degenbaev <ulan@chromium.org>
Cr-Commit-Position: refs/heads/master@{#45353}
parent a2304802
...@@ -1583,6 +1583,7 @@ v8_source_set("v8_base") { ...@@ -1583,6 +1583,7 @@ v8_source_set("v8_base") {
"src/heap/incremental-marking-job.h", "src/heap/incremental-marking-job.h",
"src/heap/incremental-marking.cc", "src/heap/incremental-marking.cc",
"src/heap/incremental-marking.h", "src/heap/incremental-marking.h",
"src/heap/item-parallel-job.h",
"src/heap/mark-compact-inl.h", "src/heap/mark-compact-inl.h",
"src/heap/mark-compact.cc", "src/heap/mark-compact.cc",
"src/heap/mark-compact.h", "src/heap/mark-compact.h",
......
// Copyright 2017 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef V8_HEAP_ITEM_PARALLEL_JOB_
#define V8_HEAP_ITEM_PARALLEL_JOB_
#include <vector>
#include "src/base/platform/semaphore.h"
#include "src/cancelable-task.h"
#include "src/v8.h"
namespace v8 {
namespace internal {
class Isolate;
// This class manages background tasks that process a set of items in parallel.
// The first task added is executed on the same thread as |job.Run()| is called.
// All other tasks are scheduled in the background.
//
// - Items need to inherit from ItemParallelJob::Item.
// - Tasks need to inherit from ItemParallelJob::Task.
//
// Items need to be marked as finished after processing them. Task and Item
// ownership is transferred to the job.
class ItemParallelJob {
public:
class Task;
class Item {
public:
Item() : state_(kAvailable) {}
virtual ~Item() {}
// Marks an item as being finished.
void MarkFinished() { CHECK(state_.TrySetValue(kProcessing, kFinished)); }
private:
enum ProcessingState { kAvailable, kProcessing, kFinished };
bool TryMarkingAsProcessing() {
return state_.TrySetValue(kAvailable, kProcessing);
}
bool IsFinished() { return state_.Value() == kFinished; }
base::AtomicValue<ProcessingState> state_;
friend class ItemParallelJob;
friend class ItemParallelJob::Task;
DISALLOW_COPY_AND_ASSIGN(Item);
};
class Task : public CancelableTask {
public:
explicit Task(Isolate* isolate)
: CancelableTask(isolate),
items_(nullptr),
cur_index_(0),
items_considered_(0),
on_finish_(nullptr) {}
virtual ~Task() {}
virtual void RunInParallel() = 0;
protected:
// Retrieves a new item that needs to be processed. Returns |nullptr| if
// all items are processed. Upon returning an item, the task is required
// to process the item and mark the item as finished after doing so.
template <class ItemType>
ItemType* GetItem() {
while (items_considered_++ != items_->size()) {
// Wrap around.
if (cur_index_ == items_->size()) {
cur_index_ = 0;
}
Item* item = (*items_)[cur_index_++];
if (item->TryMarkingAsProcessing()) {
return static_cast<ItemType*>(item);
}
}
return nullptr;
}
private:
void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index) {
on_finish_ = on_finish;
items_ = items;
cur_index_ = start_index;
}
// We don't allow overriding this method any further.
void RunInternal() final {
RunInParallel();
on_finish_->Signal();
}
std::vector<Item*>* items_;
size_t cur_index_;
size_t items_considered_;
base::Semaphore* on_finish_;
friend class ItemParallelJob;
friend class Item;
DISALLOW_COPY_AND_ASSIGN(Task);
};
ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
base::Semaphore* pending_tasks)
: cancelable_task_manager_(cancelable_task_manager),
pending_tasks_(pending_tasks) {}
~ItemParallelJob() {
for (size_t i = 0; i < items_.size(); i++) {
Item* item = items_[i];
CHECK(item->IsFinished());
delete item;
}
}
// Adds a task to the job. Transfers ownership to the job.
void AddTask(Task* task) { tasks_.push_back(task); }
// Adds an item to the job. Transfers ownership to the job.
void AddItem(Item* item) { items_.push_back(item); }
void Run() {
DCHECK_GE(tasks_.size(), 0);
const size_t num_tasks = tasks_.size();
const size_t num_items = items_.size();
const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
uint32_t* task_ids = new uint32_t[num_tasks];
size_t start_index = 0;
Task* main_task = nullptr;
Task* task = nullptr;
for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
task = tasks_[i];
if (start_index >= num_items) {
start_index -= num_items;
}
task->SetupInternal(pending_tasks_, &items_, start_index);
task_ids[i] = task->id();
if (i > 0) {
V8::GetCurrentPlatform()->CallOnBackgroundThread(
task, v8::Platform::kShortRunningTask);
} else {
main_task = task;
}
}
// Contribute on main thread.
main_task->Run();
delete main_task;
// Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
CancelableTaskManager::kTaskAborted) {
pending_tasks_->Wait();
}
}
delete[] task_ids;
}
private:
std::vector<Item*> items_;
std::vector<Task*> tasks_;
CancelableTaskManager* cancelable_task_manager_;
base::Semaphore* pending_tasks_;
DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
};
} // namespace internal
} // namespace v8
#endif // V8_HEAP_ITEM_PARALLEL_JOB_
...@@ -968,6 +968,7 @@ ...@@ -968,6 +968,7 @@
'heap/incremental-marking-job.h', 'heap/incremental-marking-job.h',
'heap/incremental-marking.cc', 'heap/incremental-marking.cc',
'heap/incremental-marking.h', 'heap/incremental-marking.h',
'heap/item-parallel-job.h',
'heap/mark-compact-inl.h', 'heap/mark-compact-inl.h',
'heap/mark-compact.cc', 'heap/mark-compact.cc',
'heap/mark-compact.h', 'heap/mark-compact.h',
......
...@@ -107,6 +107,7 @@ v8_executable("unittests") { ...@@ -107,6 +107,7 @@ v8_executable("unittests") {
"heap/gc-idle-time-handler-unittest.cc", "heap/gc-idle-time-handler-unittest.cc",
"heap/gc-tracer-unittest.cc", "heap/gc-tracer-unittest.cc",
"heap/heap-unittest.cc", "heap/heap-unittest.cc",
"heap/item-parallel-job-unittest.cc",
"heap/marking-unittest.cc", "heap/marking-unittest.cc",
"heap/memory-reducer-unittest.cc", "heap/memory-reducer-unittest.cc",
"heap/scavenge-job-unittest.cc", "heap/scavenge-job-unittest.cc",
......
// Copyright 2017 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/heap/item-parallel-job.h"
#include "src/isolate.h"
#include "test/unittests/test-utils.h"
namespace v8 {
namespace internal {
class ItemParallelJobTest : public TestWithIsolate {
public:
ItemParallelJobTest() : parallel_job_semaphore_(0) {}
base::Semaphore* parallel_job_semaphore() { return &parallel_job_semaphore_; }
private:
base::Semaphore parallel_job_semaphore_;
DISALLOW_COPY_AND_ASSIGN(ItemParallelJobTest);
};
namespace {
class EmptyTask : public ItemParallelJob::Task {
public:
explicit EmptyTask(Isolate* isolate, bool* did_run)
: ItemParallelJob::Task(isolate), did_run_(did_run) {}
void RunInParallel() override { *did_run_ = true; }
private:
bool* did_run_;
};
class SimpleItem : public ItemParallelJob::Item {
public:
explicit SimpleItem(bool* was_processed)
: ItemParallelJob::Item(), was_processed_(was_processed) {}
void Process() { *was_processed_ = true; }
private:
bool* was_processed_;
};
class EagerTask : public ItemParallelJob::Task {
public:
explicit EagerTask(Isolate* isolate) : ItemParallelJob::Task(isolate) {}
void RunInParallel() override {
SimpleItem* item = nullptr;
while ((item = GetItem<SimpleItem>()) != nullptr) {
item->Process();
item->MarkFinished();
}
}
};
class TaskProcessingOneItem : public ItemParallelJob::Task {
public:
explicit TaskProcessingOneItem(Isolate* isolate,
base::AtomicNumber<size_t>* count,
size_t finish)
: ItemParallelJob::Task(isolate), count_(count), finish_(finish) {}
void RunInParallel() override {
SimpleItem* item = GetItem<SimpleItem>();
EXPECT_NE(nullptr, item);
item->Process();
item->MarkFinished();
// Avoid canceling the remaining task if it has no started by
// busy looping.
count_->Increment(1);
while (count_->Value() != finish_) {
}
}
private:
base::AtomicNumber<size_t>* count_;
size_t finish_;
};
class TaskForDifferentItems;
class BaseItem : public ItemParallelJob::Item {
public:
virtual ~BaseItem() {}
virtual void ProcessItem(TaskForDifferentItems* task) = 0;
};
class TaskForDifferentItems : public ItemParallelJob::Task {
public:
explicit TaskForDifferentItems(Isolate* isolate, bool* processed_a,
bool* processed_b)
: ItemParallelJob::Task(isolate),
processed_a_(processed_a),
processed_b_(processed_b) {}
virtual ~TaskForDifferentItems() {}
void RunInParallel() override {
BaseItem* item = nullptr;
while ((item = GetItem<BaseItem>()) != nullptr) {
item->ProcessItem(this);
item->MarkFinished();
}
}
void ProcessA() { *processed_a_ = true; }
void ProcessB() { *processed_b_ = true; }
private:
bool* processed_a_;
bool* processed_b_;
};
class ItemA : public BaseItem {
public:
virtual ~ItemA() {}
void ProcessItem(TaskForDifferentItems* task) override { task->ProcessA(); }
};
class ItemB : public BaseItem {
public:
virtual ~ItemB() {}
void ProcessItem(TaskForDifferentItems* task) override { task->ProcessB(); }
};
} // namespace
TEST_F(ItemParallelJobTest, EmptyTaskRuns) {
bool did_run = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new EmptyTask(i_isolate(), &did_run));
job.Run();
EXPECT_TRUE(did_run);
}
TEST_F(ItemParallelJobTest, FinishAllItems) {
const int kItems = 111;
bool was_processed[kItems];
for (int i = 0; i < kItems; i++) {
was_processed[i] = false;
}
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new EagerTask(i_isolate()));
for (int i = 0; i < kItems; i++) {
job.AddItem(new SimpleItem(&was_processed[i]));
}
job.Run();
for (int i = 0; i < kItems; i++) {
EXPECT_TRUE(was_processed[i]);
}
}
TEST_F(ItemParallelJobTest, DistributeItems) {
const int kItems = 4;
bool was_processed[kItems];
base::AtomicNumber<size_t> count;
for (int i = 0; i < kItems; i++) {
was_processed[i] = false;
}
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
for (int i = 0; i < kItems; i++) {
job.AddItem(new SimpleItem(&was_processed[i]));
job.AddTask(new TaskProcessingOneItem(i_isolate(), &count, kItems));
}
job.Run();
for (int i = 0; i < kItems; i++) {
EXPECT_TRUE(was_processed[i]);
}
}
TEST_F(ItemParallelJobTest, DifferentItems) {
bool item_a = false;
bool item_b = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddItem(new ItemA());
job.AddItem(new ItemB());
job.AddTask(new TaskForDifferentItems(i_isolate(), &item_a, &item_b));
job.Run();
EXPECT_TRUE(item_a);
EXPECT_TRUE(item_b);
}
} // namespace internal
} // namespace v8
...@@ -103,6 +103,7 @@ ...@@ -103,6 +103,7 @@
'heap/embedder-tracing-unittest.cc', 'heap/embedder-tracing-unittest.cc',
'heap/gc-idle-time-handler-unittest.cc', 'heap/gc-idle-time-handler-unittest.cc',
'heap/gc-tracer-unittest.cc', 'heap/gc-tracer-unittest.cc',
'heap/item-parallel-job-unittest.cc',
'heap/marking-unittest.cc', 'heap/marking-unittest.cc',
'heap/memory-reducer-unittest.cc', 'heap/memory-reducer-unittest.cc',
'heap/heap-unittest.cc', 'heap/heap-unittest.cc',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment