Commit de49b574 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

Reland "Smoother distribution of worker assignment in parallel task array."

This is a reland of 76195d9e.

It was reverted because the new parallel tasks (with higher number
of workers) hang on client.v8.ports bots. Since each test task steals
the worker thread it's assigned but only processes one item before
waiting for completion by others: I think the problem is that there
aren't enough workers in client.v8.ports' config. There aren't any
try bots for this config... reduce the tests to use 4 tasks and
hope for the best (i.e. a 4 core machine that uses "num cores")...

Original change's description:
> Smoother distribution of worker assignment in parallel task array.
>
> This is a merge of https://chromium-review.googlesource.com/c/v8/v8/+/888704
> and https://chromium-review.googlesource.com/c/v8/v8/+/887084
>
> Which implements the fix in CL 887084 correctly in a world where
> there can be more tasks_ than items_ (crbug.com/806237).
>
> Bug: chromium:805932
> Change-Id: I05401be4fdce442644a8973281a9d88bd959b271
> Reviewed-on: https://chromium-review.googlesource.com/892883
> Commit-Queue: Gabriel Charette <gab@chromium.org>
> Reviewed-by: Michael Lippautz <mlippautz@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#50956}

Reverted-on: https://chromium-review.googlesource.com/893462

Bug: chromium:805932
Change-Id: I4d0bda3b9f52e9160e613a8f34a95e48b814bb9e
Reviewed-on: https://chromium-review.googlesource.com/893362Reviewed-by: 's avatarHannes Payer <hpayer@chromium.org>
Commit-Queue: Gabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50967}
parent 16f2bcdb
......@@ -7,8 +7,10 @@
#include <vector>
#include "src/base/macros.h"
#include "src/base/platform/semaphore.h"
#include "src/cancelable-task.h"
#include "src/utils.h"
#include "src/v8.h"
namespace v8 {
......@@ -85,11 +87,18 @@ class ItemParallelJob {
}
private:
// Sets up state required before invoking Run(). If
// |start_index is >= items_.size()|, this task will not process work items
// (some jobs have more tasks than work items in order to parallelize post-
// processing, e.g. scavenging).
void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index) {
on_finish_ = on_finish;
items_ = items;
cur_index_ = start_index;
if (start_index < items->size())
cur_index_ = start_index;
else
items_considered_ = items_->size();
}
// We don't allow overriding this method any further.
......@@ -132,20 +141,39 @@ class ItemParallelJob {
int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
void Run() {
DCHECK_GE(tasks_.size(), 0);
const size_t num_tasks = tasks_.size();
DCHECK_GT(tasks_.size(), 0);
const size_t num_items = items_.size();
const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
const size_t num_tasks = tasks_.size();
// Some jobs have more tasks than items (when the items are mere coarse
// grain tasks that generate work dynamically for a second phase which all
// tasks participate in). Some jobs even have 0 items to preprocess but
// still have multiple tasks.
// TODO(gab): Figure out a cleaner scheme for this.
const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
// In the event of an uneven workload, distribute an extra item to the first
// |items_remainder| tasks.
const size_t items_remainder = num_tasks_processing_items > 0
? num_items % num_tasks_processing_items
: 0;
// Base |items_per_task|, will be bumped by 1 for the first
// |items_remainder| tasks.
const size_t items_per_task = num_tasks_processing_items > 0
? num_items / num_tasks_processing_items
: 0;
CancelableTaskManager::Id* task_ids =
new CancelableTaskManager::Id[num_tasks];
size_t start_index = 0;
Task* main_task = nullptr;
Task* task = nullptr;
for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
task = tasks_[i];
if (start_index >= num_items) {
start_index -= num_items;
}
for (size_t i = 0, start_index = 0; i < num_tasks;
i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
Task* task = tasks_[i];
// By definition there are less |items_remainder| to distribute then
// there are tasks processing items so this cannot overflow while we are
// assigning work items.
DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
task->SetupInternal(pending_tasks_, &items_, start_index);
task_ids[i] = task->id();
if (i > 0) {
......@@ -155,9 +183,11 @@ class ItemParallelJob {
main_task = task;
}
}
// Contribute on main thread.
main_task->Run();
delete main_task;
// Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
......
......@@ -23,22 +23,32 @@ class ItemParallelJobTest : public TestWithIsolate {
namespace {
class EmptyTask : public ItemParallelJob::Task {
class SimpleTask : public ItemParallelJob::Task {
public:
explicit EmptyTask(Isolate* isolate, bool* did_run)
SimpleTask(Isolate* isolate, bool* did_run)
: ItemParallelJob::Task(isolate), did_run_(did_run) {}
void RunInParallel() override { *did_run_ = true; }
void RunInParallel() override {
ItemParallelJob::Item* item = nullptr;
while ((item = GetItem<ItemParallelJob::Item>()) != nullptr) {
item->MarkFinished();
}
*did_run_ = true;
}
private:
bool* did_run_;
};
// A simple work item which sets |was_processed| to true, if non-null, when it
// is processed.
class SimpleItem : public ItemParallelJob::Item {
public:
explicit SimpleItem(bool* was_processed)
explicit SimpleItem(bool* was_processed = nullptr)
: ItemParallelJob::Item(), was_processed_(was_processed) {}
void Process() { *was_processed_ = true; }
void Process() {
if (was_processed_) *was_processed_ = true;
}
private:
bool* was_processed_;
......@@ -83,22 +93,38 @@ class OneShotBarrier {
size_t counter_;
};
// A task that only processes a single item. If |did_process_an_item| is
// non-null, will set it to true if it does process an item. Otherwise, it will
// expect to get an item to process (and will report a failure if it doesn't).
class TaskProcessingOneItem : public ItemParallelJob::Task {
public:
explicit TaskProcessingOneItem(Isolate* isolate, OneShotBarrier* barrier)
: ItemParallelJob::Task(isolate), barrier_(barrier) {}
TaskProcessingOneItem(Isolate* isolate, OneShotBarrier* barrier,
bool* did_process_an_item = nullptr)
: ItemParallelJob::Task(isolate),
barrier_(barrier),
did_process_an_item_(did_process_an_item) {}
void RunInParallel() override {
SimpleItem* item = GetItem<SimpleItem>();
EXPECT_NE(nullptr, item);
item->Process();
item->MarkFinished();
if (did_process_an_item_) {
*did_process_an_item_ = item != nullptr;
} else {
EXPECT_NE(nullptr, item);
}
if (item) {
item->Process();
item->MarkFinished();
}
// Avoid canceling the remaining tasks with a simple barrier.
barrier_->Wait();
}
private:
OneShotBarrier* barrier_;
bool* did_process_an_item_;
};
class TaskForDifferentItems;
......@@ -148,21 +174,66 @@ class ItemB : public BaseItem {
} // namespace
TEST_F(ItemParallelJobTest, EmptyTaskRuns) {
// ItemParallelJob runs tasks even without work items (as requested tasks may be
// responsible for post-processing).
TEST_F(ItemParallelJobTest, SimpleTaskWithNoItemsRuns) {
bool did_run = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new EmptyTask(i_isolate(), &did_run));
job.AddTask(new SimpleTask(i_isolate(), &did_run));
job.Run();
EXPECT_TRUE(did_run);
}
TEST_F(ItemParallelJobTest, FinishAllItems) {
const int kItems = 111;
bool was_processed[kItems];
for (int i = 0; i < kItems; i++) {
was_processed[i] = false;
TEST_F(ItemParallelJobTest, SimpleTaskWithSimpleItemRuns) {
bool did_run = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new SimpleTask(i_isolate(), &did_run));
job.AddItem(new ItemParallelJob::Item);
job.Run();
EXPECT_TRUE(did_run);
}
TEST_F(ItemParallelJobTest, MoreTasksThanItems) {
// Note: this test will hang if the platform doesn't at least run kNumTasks
// worker threads.
const int kNumTasks = 4;
const int kNumItems = kNumTasks - 2;
TaskProcessingOneItem* tasks[kNumTasks] = {};
bool did_process_an_item[kNumTasks] = {};
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
// The barrier ensures that all tasks run. But only the first kNumItems tasks
// should be assigned an item to execute.
OneShotBarrier barrier(kNumTasks);
for (int i = 0; i < kNumTasks; i++) {
tasks[i] = new TaskProcessingOneItem(i_isolate(), &barrier,
&did_process_an_item[i]);
job.AddTask(tasks[i]);
}
for (int i = 0; i < kNumItems; i++) {
job.AddItem(new SimpleItem);
}
job.Run();
for (int i = 0; i < kNumTasks; i++) {
// Only the first kNumItems tasks should have been assigned a work item.
EXPECT_EQ(i < kNumItems, did_process_an_item[i]);
}
}
TEST_F(ItemParallelJobTest, SingleThreadProcessing) {
const int kItems = 111;
bool was_processed[kItems] = {};
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new EagerTask(i_isolate()));
......@@ -176,12 +247,11 @@ TEST_F(ItemParallelJobTest, FinishAllItems) {
}
TEST_F(ItemParallelJobTest, DistributeItemsMultipleTasks) {
const int kItemsAndTasks = 2; // Main thread + additional task.
bool was_processed[kItemsAndTasks];
// Note: this test will hang if the platform doesn't at least run
// kItemsAndTasks worker threads.
const int kItemsAndTasks = 4;
bool was_processed[kItemsAndTasks] = {};
OneShotBarrier barrier(kItemsAndTasks);
for (int i = 0; i < kItemsAndTasks; i++) {
was_processed[i] = false;
}
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
for (int i = 0; i < kItemsAndTasks; i++) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment