Commit 7d58b3c4 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

Isolate ItemParallelJob implementation in CC file.

This CL is a pure code move based on top of
https://chromium-review.googlesource.com/c/v8/v8/+/899365

Having it all in the header was becoming more and more tedious:
requiring large rebuilds for impl changes and
requiring exporting unrelated symbols merely so that unittests
could link
@ https://chromium-review.googlesource.com/c/v8/v8/+/899365
and https://chromium-review.googlesource.com/c/v8/v8/+/904523/2

R=mlippautz@chromium.org

Bug: chromium:651354
Change-Id: Ib34043d061dd3b1221cd06799eddc888090fe1c1
Reviewed-on: https://chromium-review.googlesource.com/904167Reviewed-by: 's avatarMichael Lippautz <mlippautz@chromium.org>
Commit-Queue: Gabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#51132}
parent 85b8daed
......@@ -1687,6 +1687,7 @@ v8_source_set("v8_base") {
"src/heap/invalidated-slots-inl.h",
"src/heap/invalidated-slots.cc",
"src/heap/invalidated-slots.h",
"src/heap/item-parallel-job.cc",
"src/heap/item-parallel-job.h",
"src/heap/local-allocator.h",
"src/heap/mark-compact-inl.h",
......
......@@ -999,6 +999,7 @@
'../src/heap/invalidated-slots-inl.h',
'../src/heap/invalidated-slots.cc',
'../src/heap/invalidated-slots.h',
'../src/heap/item-parallel-job.cc',
'../src/heap/item-parallel-job.h',
'../src/heap/local-allocator.h',
'../src/heap/mark-compact-inl.h',
......
......@@ -261,7 +261,7 @@ enum class HistogramTimerResolution { MILLISECOND, MICROSECOND };
// A thread safe histogram timer. It also allows distributions of
// nested timed results.
class V8_EXPORT_PRIVATE TimedHistogram : public Histogram {
class TimedHistogram : public Histogram {
public:
// Start the timer. Log if isolate non-null.
void Start(base::ElapsedTimer* timer, Isolate* isolate);
......@@ -313,7 +313,7 @@ class TimedHistogramScope {
// AsyncTimedHistogram can be moved/copied to avoid computing Now() multiple
// times when the times of multiple tasks are identical; each copy will generate
// its own report.
class V8_EXPORT_PRIVATE AsyncTimedHistogram {
class AsyncTimedHistogram {
public:
explicit AsyncTimedHistogram(TimedHistogram* histogram,
std::shared_ptr<Counters> async_counters)
......
// Copyright 2018 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/heap/item-parallel-job.h"
#include "src/base/platform/semaphore.h"
#include "src/v8.h"
namespace v8 {
namespace internal {
ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
ItemParallelJob::Task::~Task() {
// The histogram is reset in RunInternal(). If it's still around it means
// this task was cancelled before being scheduled.
if (gc_parallel_task_latency_histogram_)
gc_parallel_task_latency_histogram_->RecordAbandon();
}
void ItemParallelJob::Task::SetupInternal(
base::Semaphore* on_finish, std::vector<Item*>* items, size_t start_index,
base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram) {
on_finish_ = on_finish;
items_ = items;
if (start_index < items->size()) {
cur_index_ = start_index;
} else {
items_considered_ = items_->size();
}
gc_parallel_task_latency_histogram_ =
std::move(gc_parallel_task_latency_histogram);
}
void ItemParallelJob::Task::RunInternal() {
if (gc_parallel_task_latency_histogram_) {
gc_parallel_task_latency_histogram_->RecordDone();
gc_parallel_task_latency_histogram_.reset();
}
RunInParallel();
on_finish_->Signal();
}
ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
base::Semaphore* pending_tasks)
: cancelable_task_manager_(cancelable_task_manager),
pending_tasks_(pending_tasks) {}
ItemParallelJob::~ItemParallelJob() {
for (size_t i = 0; i < items_.size(); i++) {
Item* item = items_[i];
CHECK(item->IsFinished());
delete item;
}
}
void ItemParallelJob::Run(std::shared_ptr<Counters> async_counters) {
DCHECK_GT(tasks_.size(), 0);
const size_t num_items = items_.size();
const size_t num_tasks = tasks_.size();
AsyncTimedHistogram gc_parallel_task_latency_histogram(
async_counters->gc_parallel_task_latency(), async_counters);
// Some jobs have more tasks than items (when the items are mere coarse
// grain tasks that generate work dynamically for a second phase which all
// tasks participate in). Some jobs even have 0 items to preprocess but
// still have multiple tasks.
// TODO(gab): Figure out a cleaner scheme for this.
const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
// In the event of an uneven workload, distribute an extra item to the first
// |items_remainder| tasks.
const size_t items_remainder = num_tasks_processing_items > 0
? num_items % num_tasks_processing_items
: 0;
// Base |items_per_task|, will be bumped by 1 for the first
// |items_remainder| tasks.
const size_t items_per_task = num_tasks_processing_items > 0
? num_items / num_tasks_processing_items
: 0;
CancelableTaskManager::Id* task_ids =
new CancelableTaskManager::Id[num_tasks];
Task* main_task = nullptr;
for (size_t i = 0, start_index = 0; i < num_tasks;
i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
Task* task = tasks_[i];
// By definition there are less |items_remainder| to distribute then
// there are tasks processing items so this cannot overflow while we are
// assigning work items.
DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
task->SetupInternal(pending_tasks_, &items_, start_index,
i > 0 ? gc_parallel_task_latency_histogram
: base::Optional<AsyncTimedHistogram>());
task_ids[i] = task->id();
if (i > 0) {
V8::GetCurrentPlatform()->CallOnBackgroundThread(
task, v8::Platform::kShortRunningTask);
} else {
main_task = task;
}
}
// Contribute on main thread.
main_task->Run();
delete main_task;
// Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
CancelableTaskManager::kTaskAborted) {
pending_tasks_->Wait();
}
}
delete[] task_ids;
}
} // namespace internal
} // namespace v8
......@@ -5,19 +5,26 @@
#ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
#define V8_HEAP_ITEM_PARALLEL_JOB_H_
#include <memory>
#include <vector>
#include "src/base/atomic-utils.h"
#include "src/base/logging.h"
#include "src/base/macros.h"
#include "src/base/optional.h"
#include "src/base/platform/semaphore.h"
#include "src/cancelable-task.h"
#include "src/counters.h"
#include "src/utils.h"
#include "src/v8.h"
#include "src/globals.h"
namespace v8 {
namespace base {
class Semaphore;
}
namespace internal {
class Counters;
class Isolate;
// This class manages background tasks that process a set of items in parallel.
......@@ -32,14 +39,14 @@ class Isolate;
//
// Each parallel (non-main thread) task will report the time between the job
// being created and it being scheduled to |gc_parallel_task_latency_histogram|.
class ItemParallelJob {
class V8_EXPORT_PRIVATE ItemParallelJob {
public:
class Task;
class Item {
class V8_EXPORT_PRIVATE Item {
public:
Item() : state_(kAvailable) {}
virtual ~Item() {}
Item() = default;
virtual ~Item() = default;
// Marks an item as being finished.
void MarkFinished() { CHECK(state_.TrySetValue(kProcessing, kFinished)); }
......@@ -52,7 +59,7 @@ class ItemParallelJob {
}
bool IsFinished() { return state_.Value() == kFinished; }
base::AtomicValue<ProcessingState> state_;
base::AtomicValue<ProcessingState> state_{kAvailable};
friend class ItemParallelJob;
friend class ItemParallelJob::Task;
......@@ -60,20 +67,10 @@ class ItemParallelJob {
DISALLOW_COPY_AND_ASSIGN(Item);
};
class Task : public CancelableTask {
class V8_EXPORT_PRIVATE Task : public CancelableTask {
public:
explicit Task(Isolate* isolate)
: CancelableTask(isolate),
items_(nullptr),
cur_index_(0),
items_considered_(0),
on_finish_(nullptr) {}
virtual ~Task() {
// The histogram is reset in RunInternal(). If it's still around it means
// this task was cancelled before being scheduled.
if (gc_parallel_task_latency_histogram_)
gc_parallel_task_latency_histogram_->RecordAbandon();
}
explicit Task(Isolate* isolate);
virtual ~Task();
virtual void RunInParallel() = 0;
......@@ -97,64 +94,36 @@ class ItemParallelJob {
}
private:
friend class ItemParallelJob;
friend class Item;
// Sets up state required before invoking Run(). If
// |start_index is >= items_.size()|, this task will not process work items
// (some jobs have more tasks than work items in order to parallelize post-
// processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is
// provided, it will be used to report histograms on the latency between
// posting the task and it being scheduled.
void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index,
base::Optional<AsyncTimedHistogram>
gc_parallel_task_latency_histogram) {
on_finish_ = on_finish;
items_ = items;
if (start_index < items->size()) {
cur_index_ = start_index;
} else {
items_considered_ = items_->size();
}
gc_parallel_task_latency_histogram_ =
std::move(gc_parallel_task_latency_histogram);
}
void SetupInternal(
base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index,
base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram);
// We don't allow overriding this method any further.
void RunInternal() final {
if (gc_parallel_task_latency_histogram_) {
gc_parallel_task_latency_histogram_->RecordDone();
gc_parallel_task_latency_histogram_.reset();
}
RunInParallel();
on_finish_->Signal();
}
void RunInternal() final;
std::vector<Item*>* items_;
size_t cur_index_;
size_t items_considered_;
base::Semaphore* on_finish_;
std::vector<Item*>* items_ = nullptr;
size_t cur_index_ = 0;
size_t items_considered_ = 0;
base::Semaphore* on_finish_ = nullptr;
base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_;
friend class ItemParallelJob;
friend class Item;
DISALLOW_COPY_AND_ASSIGN(Task);
};
ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
base::Semaphore* pending_tasks)
: cancelable_task_manager_(cancelable_task_manager),
pending_tasks_(pending_tasks) {}
~ItemParallelJob() {
for (size_t i = 0; i < items_.size(); i++) {
Item* item = items_[i];
CHECK(item->IsFinished());
delete item;
}
}
base::Semaphore* pending_tasks);
~ItemParallelJob();
// Adds a task to the job. Transfers ownership to the job.
void AddTask(Task* task) { tasks_.push_back(task); }
......@@ -167,68 +136,7 @@ class ItemParallelJob {
// Runs this job. Reporting metrics in a thread-safe manner to
// |async_counters|.
void Run(std::shared_ptr<Counters> async_counters) {
DCHECK_GT(tasks_.size(), 0);
const size_t num_items = items_.size();
const size_t num_tasks = tasks_.size();
AsyncTimedHistogram gc_parallel_task_latency_histogram(
async_counters->gc_parallel_task_latency(), async_counters);
// Some jobs have more tasks than items (when the items are mere coarse
// grain tasks that generate work dynamically for a second phase which all
// tasks participate in). Some jobs even have 0 items to preprocess but
// still have multiple tasks.
// TODO(gab): Figure out a cleaner scheme for this.
const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
// In the event of an uneven workload, distribute an extra item to the first
// |items_remainder| tasks.
const size_t items_remainder = num_tasks_processing_items > 0
? num_items % num_tasks_processing_items
: 0;
// Base |items_per_task|, will be bumped by 1 for the first
// |items_remainder| tasks.
const size_t items_per_task = num_tasks_processing_items > 0
? num_items / num_tasks_processing_items
: 0;
CancelableTaskManager::Id* task_ids =
new CancelableTaskManager::Id[num_tasks];
Task* main_task = nullptr;
for (size_t i = 0, start_index = 0; i < num_tasks;
i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
Task* task = tasks_[i];
// By definition there are less |items_remainder| to distribute then
// there are tasks processing items so this cannot overflow while we are
// assigning work items.
DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
task->SetupInternal(pending_tasks_, &items_, start_index,
i > 0 ? gc_parallel_task_latency_histogram
: base::Optional<AsyncTimedHistogram>());
task_ids[i] = task->id();
if (i > 0) {
V8::GetCurrentPlatform()->CallOnBackgroundThread(
task, v8::Platform::kShortRunningTask);
} else {
main_task = task;
}
}
// Contribute on main thread.
main_task->Run();
delete main_task;
// Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
CancelableTaskManager::kTaskAborted) {
pending_tasks_->Wait();
}
}
delete[] task_ids;
}
void Run(std::shared_ptr<Counters> async_counters);
private:
std::vector<Item*> items_;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment