Commit 396e7bc8 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

Revert "Smoother distribution of worker assignment in parallel task array."

This reverts commit 76195d9e.

Reason for revert: New parallel tests timeout on the waterfall (I think because it's configured to use less worker threads and TaskProcessingOneItem is currently designed to steal a worker but only process one item...).

Original change's description:
> Smoother distribution of worker assignment in parallel task array.
> 
> This is a merge of https://chromium-review.googlesource.com/c/v8/v8/+/888704
> and https://chromium-review.googlesource.com/c/v8/v8/+/887084
> 
> Which implements the fix in CL 887084 correctly in a world where
> there can be more tasks_ than items_ (crbug.com/806237).
> 
> Bug: chromium:805932
> Change-Id: I05401be4fdce442644a8973281a9d88bd959b271
> Reviewed-on: https://chromium-review.googlesource.com/892883
> Commit-Queue: Gabriel Charette <gab@chromium.org>
> Reviewed-by: Michael Lippautz <mlippautz@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#50956}

TBR=gab@chromium.org,hpayer@chromium.org,mlippautz@chromium.org

Change-Id: Icf52eb3afeb9467557c1e0db6922d590466943f0
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: chromium:805932
Reviewed-on: https://chromium-review.googlesource.com/893462Reviewed-by: 's avatarHannes Payer <hpayer@chromium.org>
Commit-Queue: Gabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50965}
parent 0db74d49
...@@ -7,10 +7,8 @@ ...@@ -7,10 +7,8 @@
#include <vector> #include <vector>
#include "src/base/macros.h"
#include "src/base/platform/semaphore.h" #include "src/base/platform/semaphore.h"
#include "src/cancelable-task.h" #include "src/cancelable-task.h"
#include "src/utils.h"
#include "src/v8.h" #include "src/v8.h"
namespace v8 { namespace v8 {
...@@ -87,18 +85,11 @@ class ItemParallelJob { ...@@ -87,18 +85,11 @@ class ItemParallelJob {
} }
private: private:
// Sets up state required before invoking Run(). If
// |start_index is >= items_.size()|, this task will not process work items
// (some jobs have more tasks than work items in order to parallelize post-
// processing, e.g. scavenging).
void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items, void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index) { size_t start_index) {
on_finish_ = on_finish; on_finish_ = on_finish;
items_ = items; items_ = items;
if (start_index < items->size()) cur_index_ = start_index;
cur_index_ = start_index;
else
items_considered_ = items_->size();
} }
// We don't allow overriding this method any further. // We don't allow overriding this method any further.
...@@ -141,39 +132,20 @@ class ItemParallelJob { ...@@ -141,39 +132,20 @@ class ItemParallelJob {
int NumberOfTasks() const { return static_cast<int>(tasks_.size()); } int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
void Run() { void Run() {
DCHECK_GT(tasks_.size(), 0); DCHECK_GE(tasks_.size(), 0);
const size_t num_items = items_.size();
const size_t num_tasks = tasks_.size(); const size_t num_tasks = tasks_.size();
const size_t num_items = items_.size();
// Some jobs have more tasks than items (when the items are mere coarse const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
// grain tasks that generate work dynamically for a second phase which all
// tasks participate in). Some jobs even have 0 items to preprocess but
// still have multiple tasks.
// TODO(gab): Figure out a cleaner scheme for this.
const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
// In the event of an uneven workload, distribute an extra item to the first
// |items_remainder| tasks.
const size_t items_remainder = num_tasks_processing_items > 0
? num_items % num_tasks_processing_items
: 0;
// Base |items_per_task|, will be bumped by 1 for the first
// |items_remainder| tasks.
const size_t items_per_task = num_tasks_processing_items > 0
? num_items / num_tasks_processing_items
: 0;
CancelableTaskManager::Id* task_ids = CancelableTaskManager::Id* task_ids =
new CancelableTaskManager::Id[num_tasks]; new CancelableTaskManager::Id[num_tasks];
size_t start_index = 0;
Task* main_task = nullptr; Task* main_task = nullptr;
for (size_t i = 0, start_index = 0; i < num_tasks; Task* task = nullptr;
i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) { for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
Task* task = tasks_[i]; task = tasks_[i];
if (start_index >= num_items) {
// By definition there are less |items_remainder| to distribute then start_index -= num_items;
// there are tasks processing items so this cannot overflow while we are }
// assigning work items.
DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
task->SetupInternal(pending_tasks_, &items_, start_index); task->SetupInternal(pending_tasks_, &items_, start_index);
task_ids[i] = task->id(); task_ids[i] = task->id();
if (i > 0) { if (i > 0) {
...@@ -183,11 +155,9 @@ class ItemParallelJob { ...@@ -183,11 +155,9 @@ class ItemParallelJob {
main_task = task; main_task = task;
} }
} }
// Contribute on main thread. // Contribute on main thread.
main_task->Run(); main_task->Run();
delete main_task; delete main_task;
// Wait for background tasks. // Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) { for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) != if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
......
...@@ -23,32 +23,22 @@ class ItemParallelJobTest : public TestWithIsolate { ...@@ -23,32 +23,22 @@ class ItemParallelJobTest : public TestWithIsolate {
namespace { namespace {
class SimpleTask : public ItemParallelJob::Task { class EmptyTask : public ItemParallelJob::Task {
public: public:
SimpleTask(Isolate* isolate, bool* did_run) explicit EmptyTask(Isolate* isolate, bool* did_run)
: ItemParallelJob::Task(isolate), did_run_(did_run) {} : ItemParallelJob::Task(isolate), did_run_(did_run) {}
void RunInParallel() override { void RunInParallel() override { *did_run_ = true; }
ItemParallelJob::Item* item = nullptr;
while ((item = GetItem<ItemParallelJob::Item>()) != nullptr) {
item->MarkFinished();
}
*did_run_ = true;
}
private: private:
bool* did_run_; bool* did_run_;
}; };
// A simple work item which sets |was_processed| to true, if non-null, when it
// is processed.
class SimpleItem : public ItemParallelJob::Item { class SimpleItem : public ItemParallelJob::Item {
public: public:
explicit SimpleItem(bool* was_processed = nullptr) explicit SimpleItem(bool* was_processed)
: ItemParallelJob::Item(), was_processed_(was_processed) {} : ItemParallelJob::Item(), was_processed_(was_processed) {}
void Process() { void Process() { *was_processed_ = true; }
if (was_processed_) *was_processed_ = true;
}
private: private:
bool* was_processed_; bool* was_processed_;
...@@ -93,38 +83,22 @@ class OneShotBarrier { ...@@ -93,38 +83,22 @@ class OneShotBarrier {
size_t counter_; size_t counter_;
}; };
// A task that only processes a single item. If |did_process_an_item| is
// non-null, will set it to true if it does process an item. Otherwise, it will
// expect to get an item to process (and will report a failure if it doesn't).
class TaskProcessingOneItem : public ItemParallelJob::Task { class TaskProcessingOneItem : public ItemParallelJob::Task {
public: public:
TaskProcessingOneItem(Isolate* isolate, OneShotBarrier* barrier, explicit TaskProcessingOneItem(Isolate* isolate, OneShotBarrier* barrier)
bool* did_process_an_item = nullptr) : ItemParallelJob::Task(isolate), barrier_(barrier) {}
: ItemParallelJob::Task(isolate),
barrier_(barrier),
did_process_an_item_(did_process_an_item) {}
void RunInParallel() override { void RunInParallel() override {
SimpleItem* item = GetItem<SimpleItem>(); SimpleItem* item = GetItem<SimpleItem>();
EXPECT_NE(nullptr, item);
if (did_process_an_item_) { item->Process();
*did_process_an_item_ = item != nullptr; item->MarkFinished();
} else {
EXPECT_NE(nullptr, item);
}
if (item) {
item->Process();
item->MarkFinished();
}
// Avoid canceling the remaining tasks with a simple barrier. // Avoid canceling the remaining tasks with a simple barrier.
barrier_->Wait(); barrier_->Wait();
} }
private: private:
OneShotBarrier* barrier_; OneShotBarrier* barrier_;
bool* did_process_an_item_;
}; };
class TaskForDifferentItems; class TaskForDifferentItems;
...@@ -174,64 +148,21 @@ class ItemB : public BaseItem { ...@@ -174,64 +148,21 @@ class ItemB : public BaseItem {
} // namespace } // namespace
// ItemParallelJob runs tasks even without work items (as requested tasks may be TEST_F(ItemParallelJobTest, EmptyTaskRuns) {
// responsible for post-processing).
TEST_F(ItemParallelJobTest, SimpleTaskWithNoItemsRuns) {
bool did_run = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
job.AddTask(new SimpleTask(i_isolate(), &did_run));
job.Run();
EXPECT_TRUE(did_run);
}
TEST_F(ItemParallelJobTest, SimpleTaskWithSimpleItemRuns) {
bool did_run = false; bool did_run = false;
ItemParallelJob job(i_isolate()->cancelable_task_manager(), ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore()); parallel_job_semaphore());
job.AddTask(new SimpleTask(i_isolate(), &did_run)); job.AddTask(new EmptyTask(i_isolate(), &did_run));
job.AddItem(new ItemParallelJob::Item);
job.Run(); job.Run();
EXPECT_TRUE(did_run); EXPECT_TRUE(did_run);
} }
TEST_F(ItemParallelJobTest, MoreTasksThanItems) { TEST_F(ItemParallelJobTest, FinishAllItems) {
const int kNumTasks = 5;
const int kNumItems = kNumTasks - 2;
TaskProcessingOneItem* tasks[kNumTasks] = {};
bool did_process_an_item[kNumTasks] = {};
ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore());
// The barrier ensures that all tasks run. But only the first kNumItems tasks
// should be assigned an item to execute.
OneShotBarrier barrier(kNumTasks);
for (int i = 0; i < kNumTasks; i++) {
tasks[i] = new TaskProcessingOneItem(i_isolate(), &barrier,
&did_process_an_item[i]);
job.AddTask(tasks[i]);
}
for (int i = 0; i < kNumItems; i++) {
job.AddItem(new SimpleItem);
}
job.Run();
for (int i = 0; i < kNumTasks; i++) {
// Only the first kNumItems tasks should have been assigned a work item.
EXPECT_EQ(i < kNumItems, did_process_an_item[i]);
}
}
TEST_F(ItemParallelJobTest, SingleThreadProcessing) {
const int kItems = 111; const int kItems = 111;
bool was_processed[kItems] = {}; bool was_processed[kItems];
for (int i = 0; i < kItems; i++) {
was_processed[i] = false;
}
ItemParallelJob job(i_isolate()->cancelable_task_manager(), ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore()); parallel_job_semaphore());
job.AddTask(new EagerTask(i_isolate())); job.AddTask(new EagerTask(i_isolate()));
...@@ -245,9 +176,12 @@ TEST_F(ItemParallelJobTest, SingleThreadProcessing) { ...@@ -245,9 +176,12 @@ TEST_F(ItemParallelJobTest, SingleThreadProcessing) {
} }
TEST_F(ItemParallelJobTest, DistributeItemsMultipleTasks) { TEST_F(ItemParallelJobTest, DistributeItemsMultipleTasks) {
const int kItemsAndTasks = 8; const int kItemsAndTasks = 2; // Main thread + additional task.
bool was_processed[kItemsAndTasks] = {}; bool was_processed[kItemsAndTasks];
OneShotBarrier barrier(kItemsAndTasks); OneShotBarrier barrier(kItemsAndTasks);
for (int i = 0; i < kItemsAndTasks; i++) {
was_processed[i] = false;
}
ItemParallelJob job(i_isolate()->cancelable_task_manager(), ItemParallelJob job(i_isolate()->cancelable_task_manager(),
parallel_job_semaphore()); parallel_job_semaphore());
for (int i = 0; i < kItemsAndTasks; i++) { for (int i = 0; i < kItemsAndTasks; i++) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment