Commit 05b62681 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs]: Add job impl to default platform.

The impl works by posting up to NumberOfWorkerThreads() tasks
with CallOnWorkerThread().

Change-Id: I188ac57c9e5d6e3befdcc6f945fbf337dabe1d1d
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2130886
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: 's avatarGabriel Charette <gab@chromium.org>
Reviewed-by: 's avatarUlan Degenbaev <ulan@chromium.org>
Reviewed-by: 's avatarMichael Lippautz <mlippautz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#67368}
parent 30350b65
......@@ -3902,6 +3902,8 @@ v8_component("v8_libplatform") {
"include/libplatform/v8-tracing.h",
"src/libplatform/default-foreground-task-runner.cc",
"src/libplatform/default-foreground-task-runner.h",
"src/libplatform/default-job.cc",
"src/libplatform/default-job.h",
"src/libplatform/default-platform.cc",
"src/libplatform/default-platform.h",
"src/libplatform/default-worker-threads-task-runner.cc",
......
......@@ -17,6 +17,28 @@ namespace v8 {
class Isolate;
// Valid priorities supported by the task scheduling infrastructure.
enum class TaskPriority : uint8_t {
/**
* Best effort tasks are not critical for performance of the application. The
* platform implementation should preempt such tasks if higher priority tasks
* arrive.
*/
kBestEffort,
/**
* User visible tasks are long running background tasks that will
* improve performance and memory usage of the application upon completion.
* Example: background compilation and garbage collection.
*/
kUserVisible,
/**
* User blocking tasks are highest priority tasks that block the execution
* thread (e.g. major garbage collection). They must be finished as soon as
* possible.
*/
kUserBlocking,
};
/**
* A Task represents a unit of work.
*/
......@@ -113,6 +135,82 @@ class TaskRunner {
TaskRunner& operator=(const TaskRunner&) = delete;
};
/**
* Delegate that's passed to Job's worker task, providing an entry point to
* communicate with the scheduler.
*/
class JobDelegate {
public:
/**
* Returns true if this thread should return from the worker task on the
* current thread ASAP. Workers should periodically invoke ShouldYield (or
* YieldIfNeeded()) as often as is reasonable.
*/
virtual bool ShouldYield() = 0;
/**
* Notifies the scheduler that max concurrency was increased, and the number
* of worker should be adjusted accordingly. See Platform::PostJob() for more
* details.
*/
virtual void NotifyConcurrencyIncrease() = 0;
};
/**
* Handle returned when posting a Job. Provides methods to control execution of
* the posted Job.
*/
class JobHandle {
public:
virtual ~JobHandle() = default;
/**
* Notifies the scheduler that max concurrency was increased, and the number
* of worker should be adjusted accordingly. See Platform::PostJob() for more
* details.
*/
virtual void NotifyConcurrencyIncrease() = 0;
/**
* Contributes to the job on this thread. Doesn't return until all tasks have
* completed and max concurrency becomes 0. When Join() is called and max
* concurrency reaches 0, it should not increase again. This also promotes
* this Job's priority to be at least as high as the calling thread's
* priority.
*/
virtual void Join() = 0;
/**
* Forces all existing workers to yield ASAP. Waits until they have all
* returned from the Job's callback before returning.
*/
virtual void Cancel() = 0;
/**
* Returns true if associated with a Job and other methods may be called.
* Returns false after Join() or Cancel() was called.
*/
virtual bool IsRunning() = 0;
};
/**
* A JobTask represents work to run in parallel from Platform::PostJob().
*/
class JobTask {
public:
virtual ~JobTask() = default;
virtual void Run(JobDelegate* delegate) = 0;
/**
* Controls the maximum number of threads calling Run() concurrently. Run() is
* only invoked if the number of threads previously running Run() was less
* than the value returned. Since GetMaxConcurrency() is a leaf function, it
* must not call back any JobHandle methods.
*/
virtual size_t GetMaxConcurrency() const = 0;
};
/**
* The interface represents complex arguments to trace events.
*/
......@@ -373,6 +471,64 @@ class Platform {
*/
virtual bool IdleTasksEnabled(Isolate* isolate) { return false; }
/**
* Posts |job_task| to run in parallel. Returns a JobHandle associated with
* the Job, which can be joined or canceled.
* This avoids degenerate cases:
* - Calling CallOnWorkerThread() for each work item, causing significant
* overhead.
* - Fixed number of CallOnWorkerThread() calls that split the work and might
* run for a long time. This is problematic when many components post
* "num cores" tasks and all expect to use all the cores. In these cases,
* the scheduler lacks context to be fair to multiple same-priority requests
* and/or ability to request lower priority work to yield when high priority
* work comes in.
* A canonical implementation of |job_task| looks like:
* class MyJobTask : public JobTask {
* public:
* MyJobTask(...) : worker_queue_(...) {}
* // JobTask:
* void Run(JobDelegate* delegate) override {
* while (!delegate->ShouldYield()) {
* // Smallest unit of work.
* auto work_item = worker_queue_.TakeWorkItem(); // Thread safe.
* if (!work_item) return;
* ProcessWork(work_item);
* }
* }
*
* size_t GetMaxConcurrency() const override {
* return worker_queue_.GetSize(); // Thread safe.
* }
* };
* auto handle = PostJob(TaskPriority::kUserVisible,
* std::make_unique<MyJobTask>(...));
* handle->Join();
*
* PostJob() and methods of the returned JobHandle/JobDelegate, must never be
* called while holding a lock that could be acquired by JobTask::Run or
* JobTask::GetMaxConcurrency -- that could result in a deadlock. This is
* because [1] JobTask::GetMaxConcurrency may be invoked while holding
* internal lock (A), hence JobTask::GetMaxConcurrency can only use a lock (B)
* if that lock is *never* held while calling back into JobHandle from any
* thread (A=>B/B=>A deadlock) and [2] JobTask::Run or
* JobTask::GetMaxConcurrency may be invoked synchronously from JobHandle
* (B=>JobHandle::foo=>B deadlock).
*
* A sufficient PostJob() implementation that uses the default Job provided in
* libplatform looks like:
* std::unique_ptr<JobHandle> PostJob(
* TaskPriority priority, std::unique_ptr<JobTask> job_task) override {
* return std::make_unique<DefaultJobHandle>(
* std::make_shared<DefaultJobState>(
* this, std::move(job_task), kNumThreads));
* }
*/
virtual std::unique_ptr<JobHandle> PostJob(
TaskPriority priority, std::unique_ptr<JobTask> job_task) {
return nullptr;
}
/**
* Monotonically increasing time in seconds from an arbitrary fixed point in
* the past. This function is expected to return at least
......
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/libplatform/default-job.h"
namespace v8 {
namespace platform {
DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }
void DefaultJobState::NotifyConcurrencyIncrease() {
if (is_canceled_.load(std::memory_order_relaxed)) return;
size_t num_tasks_to_post = 0;
{
base::MutexGuard guard(&mutex_);
const size_t max_concurrency = CappedMaxConcurrency();
// Consider |pending_tasks_| to avoid posting too many tasks.
if (max_concurrency > (active_workers_ + pending_tasks_)) {
num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
pending_tasks_ += num_tasks_to_post;
}
}
// Post additional worker tasks to reach |max_concurrency|.
for (size_t i = 0; i < num_tasks_to_post; ++i) {
CallOnWorkerThread(std::make_unique<DefaultJobWorker>(shared_from_this(),
job_task_.get()));
}
}
void DefaultJobState::Join() {
bool can_run = false;
{
base::MutexGuard guard(&mutex_);
priority_ = TaskPriority::kUserBlocking;
// Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
// here, but WaitForParticipationOpportunityLockRequired() waits for
// workers to return if necessary so we don't exceed GetMaxConcurrency().
num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
++active_workers_;
can_run = WaitForParticipationOpportunityLockRequired();
}
while (can_run) {
job_task_->Run(this);
base::MutexGuard guard(&mutex_);
can_run = WaitForParticipationOpportunityLockRequired();
}
}
void DefaultJobState::CancelAndWait() {
{
base::MutexGuard guard(&mutex_);
is_canceled_.store(true, std::memory_order_relaxed);
while (active_workers_ > 0) {
worker_released_condition_.Wait(&mutex_);
}
}
}
bool DefaultJobState::CanRunFirstTask() {
base::MutexGuard guard(&mutex_);
--pending_tasks_;
if (is_canceled_.load(std::memory_order_relaxed)) return false;
if (active_workers_ >=
std::min(job_task_->GetMaxConcurrency(), num_worker_threads_)) {
return false;
}
// Acquire current worker.
++active_workers_;
return true;
}
bool DefaultJobState::DidRunTask() {
size_t num_tasks_to_post = 0;
{
base::MutexGuard guard(&mutex_);
const size_t max_concurrency = CappedMaxConcurrency();
if (is_canceled_.load(std::memory_order_relaxed) ||
active_workers_ > max_concurrency) {
// Release current worker and notify.
--active_workers_;
worker_released_condition_.NotifyOne();
return false;
}
// Consider |pending_tasks_| to avoid posting too many tasks.
if (max_concurrency > active_workers_ + pending_tasks_) {
num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
pending_tasks_ += num_tasks_to_post;
}
}
// Post additional worker tasks to reach |max_concurrency| in the case that
// max concurrency increased. This is not strictly necessary, since
// NotifyConcurrencyIncrease() should eventually be invoked. However, some
// users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
// late. Posting here allows us to spawn new workers sooner.
for (size_t i = 0; i < num_tasks_to_post; ++i) {
CallOnWorkerThread(std::make_unique<DefaultJobWorker>(shared_from_this(),
job_task_.get()));
}
return true;
}
bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
size_t max_concurrency = CappedMaxConcurrency();
while (active_workers_ > max_concurrency && active_workers_ > 1) {
worker_released_condition_.Wait(&mutex_);
max_concurrency = CappedMaxConcurrency();
}
if (active_workers_ <= max_concurrency) return true;
DCHECK_EQ(1U, active_workers_);
DCHECK_EQ(0U, max_concurrency);
active_workers_ = 0;
is_canceled_.store(true, std::memory_order_relaxed);
return false;
}
size_t DefaultJobState::CappedMaxConcurrency() const {
return std::min(job_task_->GetMaxConcurrency(), num_worker_threads_);
}
void DefaultJobState::CallOnWorkerThread(std::unique_ptr<Task> task) {
switch (priority_) {
case TaskPriority::kBestEffort:
return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task));
case TaskPriority::kUserVisible:
return platform_->CallOnWorkerThread(std::move(task));
case TaskPriority::kUserBlocking:
return platform_->CallBlockingTaskOnWorkerThread(std::move(task));
}
}
DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
: state_(std::move(state)) {
state_->NotifyConcurrencyIncrease();
}
DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); }
void DefaultJobHandle::Join() {
state_->Join();
state_ = nullptr;
}
void DefaultJobHandle::Cancel() {
state_->CancelAndWait();
state_ = nullptr;
}
} // namespace platform
} // namespace v8
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef V8_LIBPLATFORM_DEFAULT_JOB_H_
#define V8_LIBPLATFORM_DEFAULT_JOB_H_
#include <atomic>
#include <memory>
#include "include/libplatform/libplatform-export.h"
#include "include/v8-platform.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
namespace v8 {
namespace platform {
class V8_PLATFORM_EXPORT DefaultJobState
: NON_EXPORTED_BASE(public JobDelegate),
public std::enable_shared_from_this<DefaultJobState> {
public:
DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task,
TaskPriority priority, size_t num_worker_threads)
: platform_(platform),
job_task_(std::move(job_task)),
priority_(priority),
num_worker_threads_(num_worker_threads) {}
virtual ~DefaultJobState();
void NotifyConcurrencyIncrease() override;
bool ShouldYield() override {
// Thread-safe but may return an outdated result.
return is_canceled_.load(std::memory_order_relaxed);
}
void Join();
void CancelAndWait();
// Must be called before running |job_task_| for the first time. If it returns
// true, then the worker thread must contribute and must call DidRunTask(), or
// false if it should return.
bool CanRunFirstTask();
// Must be called after running |job_task_|. Returns true if the worker thread
// must contribute again, or false if it should return.
bool DidRunTask();
private:
// Called from the joining thread. Waits for the worker count to be below or
// equal to max concurrency (will happen when a worker calls
// DidRunTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// there's no work remaining.
bool WaitForParticipationOpportunityLockRequired();
// Returns GetMaxConcurrency() capped by the number of threads used by this
// job.
size_t CappedMaxConcurrency() const;
void CallOnWorkerThread(std::unique_ptr<Task> task);
Platform* const platform_;
std::unique_ptr<JobTask> job_task_;
// All members below are protected by |mutex_|.
base::Mutex mutex_;
TaskPriority priority_;
// Number of workers running this job.
size_t active_workers_ = 0;
// Number of posted tasks that aren't running this job yet.
size_t pending_tasks_ = 0;
// Indicates if the job is canceled.
std::atomic_bool is_canceled_{false};
// Number of worker threads available to schedule the worker task.
size_t num_worker_threads_;
// Signaled when a worker returns.
base::ConditionVariable worker_released_condition_;
};
class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle {
public:
explicit DefaultJobHandle(std::shared_ptr<DefaultJobState> state);
~DefaultJobHandle() override;
void NotifyConcurrencyIncrease() override {
state_->NotifyConcurrencyIncrease();
}
void Join() override;
void Cancel() override;
bool IsRunning() override { return state_ != nullptr; }
private:
std::shared_ptr<DefaultJobState> state_;
DISALLOW_COPY_AND_ASSIGN(DefaultJobHandle);
};
class DefaultJobWorker : public Task {
public:
DefaultJobWorker(std::weak_ptr<DefaultJobState> state, JobTask* job_task)
: state_(std::move(state)), job_task_(job_task) {}
~DefaultJobWorker() override = default;
void Run() override {
auto shared_state = state_.lock();
if (!shared_state) return;
if (!shared_state->CanRunFirstTask()) return;
do {
job_task_->Run(shared_state.get());
} while (shared_state->DidRunTask());
}
private:
friend class DefaultJob;
std::weak_ptr<DefaultJobState> state_;
JobTask* job_task_;
DISALLOW_COPY_AND_ASSIGN(DefaultJobWorker);
};
} // namespace platform
} // namespace v8
#endif // V8_LIBPLATFORM_DEFAULT_JOB_H_
......@@ -15,6 +15,7 @@
#include "src/base/platform/time.h"
#include "src/base/sys-info.h"
#include "src/libplatform/default-foreground-task-runner.h"
#include "src/libplatform/default-job.h"
#include "src/libplatform/default-worker-threads-task-runner.h"
namespace v8 {
......@@ -201,6 +202,24 @@ bool DefaultPlatform::IdleTasksEnabled(Isolate* isolate) {
return idle_task_support_ == IdleTaskSupport::kEnabled;
}
std::unique_ptr<JobHandle> DefaultPlatform::PostJob(
TaskPriority priority, std::unique_ptr<JobTask> job_task) {
size_t num_worker_threads = 0;
switch (priority) {
case TaskPriority::kUserBlocking:
num_worker_threads = NumberOfWorkerThreads();
break;
case TaskPriority::kUserVisible:
num_worker_threads = NumberOfWorkerThreads() / 2;
break;
case TaskPriority::kBestEffort:
num_worker_threads = 1;
break;
}
return std::make_unique<DefaultJobHandle>(std::make_shared<DefaultJobState>(
this, std::move(job_task), priority, num_worker_threads));
}
double DefaultPlatform::MonotonicallyIncreasingTime() {
if (time_function_for_testing_) return time_function_for_testing_();
return DefaultTimeFunction();
......
......@@ -62,6 +62,8 @@ class V8_PLATFORM_EXPORT DefaultPlatform : public NON_EXPORTED_BASE(Platform) {
void CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled(Isolate* isolate) override;
std::unique_ptr<JobHandle> PostJob(
TaskPriority priority, std::unique_ptr<JobTask> job_state) override;
double MonotonicallyIncreasingTime() override;
double CurrentClockTimeMillis() override;
v8::TracingController* GetTracingController() override;
......
......@@ -704,6 +704,12 @@ class TestPlatform : public v8::Platform {
old_platform_->CallDelayedOnWorkerThread(std::move(task), delay_in_seconds);
}
std::unique_ptr<v8::JobHandle> PostJob(
v8::TaskPriority priority,
std::unique_ptr<v8::JobTask> job_task) override {
return old_platform_->PostJob(priority, std::move(job_task));
}
double MonotonicallyIncreasingTime() override {
return old_platform_->MonotonicallyIncreasingTime();
}
......
......@@ -253,6 +253,7 @@ v8_source_set("unittests_sources") {
"interpreter/constant-array-builder-unittest.cc",
"interpreter/interpreter-assembler-unittest.cc",
"interpreter/interpreter-assembler-unittest.h",
"libplatform/default-job-unittest.cc",
"libplatform/default-platform-unittest.cc",
"libplatform/default-worker-threads-task-runner-unittest.cc",
"libplatform/task-queue-unittest.cc",
......
......@@ -140,6 +140,11 @@ class MockPlatform : public v8::Platform {
bool IdleTasksEnabled(v8::Isolate* isolate) override { return true; }
std::unique_ptr<JobHandle> PostJob(
TaskPriority priority, std::unique_ptr<JobTask> job_state) override {
UNREACHABLE();
}
double MonotonicallyIncreasingTime() override {
time_ += time_step_;
return time_;
......
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/libplatform/default-job.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/platform.h"
#include "src/libplatform/default-platform.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace v8 {
namespace platform {
namespace default_job_unittest {
class DefaultJobTest : public ::testing::Test {
public:
DefaultPlatform* platform() { return &platform_; }
private:
DefaultPlatform platform_;
};
// Verify that Cancel() on a job stops running the worker task and causes
// current workers to yield.
TEST_F(DefaultJobTest, CancelJob) {
static constexpr size_t kTooManyTasks = 1000;
static constexpr size_t kMaxTask = 4;
platform()->SetThreadPoolSize(kMaxTask);
// This Job notifies |threads_running| once started and loops until
// ShouldYield() returns true, and then returns.
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {
{
base::MutexGuard guard(&mutex);
worker_count++;
}
threads_running.NotifyOne();
while (!delegate->ShouldYield()) {
}
}
size_t GetMaxConcurrency() const override {
return max_concurrency.load(std::memory_order_relaxed);
}
base::Mutex mutex;
base::ConditionVariable threads_running;
size_t worker_count = 0;
std::atomic_size_t max_concurrency{kTooManyTasks};
};
auto job = std::make_unique<JobTest>();
JobTest* job_raw = job.get();
auto state = std::make_shared<DefaultJobState>(
platform(), std::move(job), TaskPriority::kUserVisible, kMaxTask);
state->NotifyConcurrencyIncrease();
{
base::MutexGuard guard(&job_raw->mutex);
while (job_raw->worker_count < kMaxTask) {
job_raw->threads_running.Wait(&job_raw->mutex);
}
EXPECT_EQ(kMaxTask, job_raw->worker_count);
}
state->CancelAndWait();
// Workers should return and this test should not hang.
}
// Verify that Join() on a job contributes to max concurrency and waits for all
// workers to return.
TEST_F(DefaultJobTest, JoinJobContributes) {
static constexpr size_t kMaxTask = 4;
platform()->SetThreadPoolSize(kMaxTask);
// This Job notifies |threads_running| once started and blocks on a barrier
// until kMaxTask + 1 threads reach that point, and then returns.
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {
base::MutexGuard guard(&mutex);
worker_count++;
threads_running.NotifyAll();
while (worker_count < kMaxTask + 1) threads_running.Wait(&mutex);
--max_concurrency;
}
size_t GetMaxConcurrency() const override {
return max_concurrency.load(std::memory_order_relaxed);
}
base::Mutex mutex;
base::ConditionVariable threads_running;
size_t worker_count = 0;
std::atomic_size_t max_concurrency{kMaxTask + 1};
};
auto job = std::make_unique<JobTest>();
JobTest* job_raw = job.get();
auto state = std::make_shared<DefaultJobState>(
platform(), std::move(job), TaskPriority::kUserVisible, kMaxTask);
state->NotifyConcurrencyIncrease();
// The main thread contributing is necessary for |worker_count| to reach
// kMaxTask + 1 thus, Join() should not hang.
state->Join();
EXPECT_EQ(0U, job_raw->max_concurrency);
}
// Verify that calling NotifyConcurrencyIncrease() (re-)schedules tasks with the
// intended concurrency.
TEST_F(DefaultJobTest, JobNotifyConcurrencyIncrease) {
static constexpr size_t kMaxTask = 4;
platform()->SetThreadPoolSize(kMaxTask);
// This Job notifies |threads_running| once started and blocks on a barrier
// until kMaxTask threads reach that point, and then returns.
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {
base::MutexGuard guard(&mutex);
worker_count++;
threads_running.NotifyAll();
// Wait synchronously until |kMaxTask| workers reach this point.
while (worker_count < kMaxTask) threads_running.Wait(&mutex);
--max_concurrency;
}
size_t GetMaxConcurrency() const override {
return max_concurrency.load(std::memory_order_relaxed);
}
base::Mutex mutex;
base::ConditionVariable threads_running;
bool continue_flag = false;
size_t worker_count = 0;
std::atomic_size_t max_concurrency{kMaxTask / 2};
};
auto job = std::make_unique<JobTest>();
JobTest* job_raw = job.get();
auto state = std::make_shared<DefaultJobState>(
platform(), std::move(job), TaskPriority::kUserVisible, kMaxTask);
state->NotifyConcurrencyIncrease();
{
base::MutexGuard guard(&job_raw->mutex);
while (job_raw->worker_count < kMaxTask / 2)
job_raw->threads_running.Wait(&job_raw->mutex);
EXPECT_EQ(kMaxTask / 2, job_raw->worker_count);
job_raw->max_concurrency = kMaxTask;
}
state->NotifyConcurrencyIncrease();
// Workers should reach |continue_flag| and eventually return thus, Join()
// should not hang.
state->Join();
EXPECT_EQ(0U, job_raw->max_concurrency);
}
// Verify that Join() doesn't contribute if the Job is already finished.
TEST_F(DefaultJobTest, FinishBeforeJoin) {
static constexpr size_t kMaxTask = 4;
platform()->SetThreadPoolSize(kMaxTask);
// This Job notifies |threads_running| once started and returns.
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {
base::MutexGuard guard(&mutex);
worker_count++;
// Assert that main thread doesn't contribute in this test.
EXPECT_NE(main_thread_id, base::OS::GetCurrentThreadId());
worker_ran.NotifyAll();
--max_concurrency;
}
size_t GetMaxConcurrency() const override {
return max_concurrency.load(std::memory_order_relaxed);
}
const int main_thread_id = base::OS::GetCurrentThreadId();
base::Mutex mutex;
base::ConditionVariable worker_ran;
size_t worker_count = 0;
std::atomic_size_t max_concurrency{kMaxTask * 5};
};
auto job = std::make_unique<JobTest>();
JobTest* job_raw = job.get();
auto state = std::make_shared<DefaultJobState>(
platform(), std::move(job), TaskPriority::kUserVisible, kMaxTask);
state->NotifyConcurrencyIncrease();
{
base::MutexGuard guard(&job_raw->mutex);
while (job_raw->worker_count < kMaxTask * 5)
job_raw->worker_ran.Wait(&job_raw->mutex);
EXPECT_EQ(kMaxTask * 5, job_raw->worker_count);
}
state->Join();
EXPECT_EQ(0U, job_raw->max_concurrency);
}
// Verify that destroying a DefaultJobHandle triggers a DCHECK if neither Join()
// or Cancel() was called.
TEST_F(DefaultJobTest, LeakHandle) {
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {}
size_t GetMaxConcurrency() const override { return 0; }
};
auto job = std::make_unique<JobTest>();
auto state = std::make_shared<DefaultJobState>(platform(), std::move(job),
TaskPriority::kUserVisible, 1);
auto handle = std::make_unique<DefaultJobHandle>(std::move(state));
#ifdef DEBUG
EXPECT_DEATH_IF_SUPPORTED({ handle.reset(); }, "");
#endif // DEBUG
handle->Join();
}
} // namespace default_job_unittest
} // namespace platform
} // namespace v8
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment