Commit 60e7ea8a authored by mlippautz's avatar mlippautz Committed by Commit bot

Add {CancelableTaskManager} to handle {Cancelable} concurrent tasks.

This change binds each {Cancelable} task to a so-called {CancelableTaskManager},
which is then used to handle concurrent cancelation as well as synchronizing
shutdown for already running tasks.  Since ownership of tasks is transferred to
the platform executing a task (destructor), handling in the manager uses integer
ids. Note that this also mitigates (modulo integer size) the ABA problem.

All handling of {Cancelable} tasks is now encapsulated into the corresponding
manager, which is instantiated for each isolate.

R=hpayer@chromium.org
BUG=chromium:524425
LOG=N
CQ_EXTRA_TRYBOTS=tryserver.v8:v8_linux_arm64_gc_stress_dbg;tryserver.v8:v8_linux_gc_stress_dbg;tryserver.v8:v8_mac_gc_stress_dbg;tryserver.v8:v8_linux64_msan_rel;tryserver.v8:v8_linux64_tsan_rel;tryserver.v8:v8_mac64_asan_rel

Review URL: https://codereview.chromium.org/1409993012

Cr-Commit-Position: refs/heads/master@{#31997}
parent 61a39335
......@@ -11,18 +11,99 @@ namespace v8 {
namespace internal {
Cancelable::Cancelable(Isolate* isolate)
: isolate_(isolate), is_cancelled_(false) {
isolate->RegisterCancelableTask(this);
Cancelable::Cancelable(CancelableTaskManager* parent)
: parent_(parent), status_(kWaiting), id_(0), cancel_counter_(0) {
id_ = parent->Register(this);
CHECK(id_ != 0);
}
Cancelable::~Cancelable() {
if (!is_cancelled_) {
isolate_->RemoveCancelableTask(this);
// The following check is needed to avoid calling an already terminated
// manager object. This happens when the manager cancels all pending tasks
// in {CancelAndWait} only before destroying the manager object.
if (TryRun() || IsRunning()) {
parent_->TryAbort(id_);
}
}
static bool ComparePointers(void* ptr1, void* ptr2) { return ptr1 == ptr2; }
CancelableTaskManager::CancelableTaskManager()
: task_id_counter_(0), cancelable_tasks_(ComparePointers) {}
uint32_t CancelableTaskManager::Register(Cancelable* task) {
base::LockGuard<base::Mutex> guard(&mutex_);
uint32_t id = ++task_id_counter_;
// The loop below is just used when task_id_counter_ overflows.
while ((id == 0) || (cancelable_tasks_.Lookup(reinterpret_cast<void*>(id),
id) != nullptr)) {
++id;
}
HashMap::Entry* entry =
cancelable_tasks_.LookupOrInsert(reinterpret_cast<void*>(id), id);
entry->value = task;
return id;
}
bool CancelableTaskManager::TryAbort(uint32_t id) {
base::LockGuard<base::Mutex> guard(&mutex_);
Cancelable* value = reinterpret_cast<Cancelable*>(
cancelable_tasks_.Remove(reinterpret_cast<void*>(id), id));
if (value != nullptr) {
bool success = value->Cancel();
cancelable_tasks_barrier_.NotifyOne();
if (!success) return false;
return true;
}
return false;
}
void CancelableTaskManager::CancelAndWait() {
// Clean up all cancelable fore- and background tasks. Tasks are canceled on
// the way if possible, i.e., if they have not started yet. After each round
// of canceling we wait for the background tasks that have already been
// started.
base::LockGuard<base::Mutex> guard(&mutex_);
// HashMap does not support removing while iterating, hence keep a set of
// entries that are to be removed.
std::set<uint32_t> to_remove;
// Cancelable tasks could potentially register new tasks, requiring a loop
// here.
while (cancelable_tasks_.occupancy() > 0) {
for (HashMap::Entry* p = cancelable_tasks_.Start(); p != nullptr;
p = cancelable_tasks_.Next(p)) {
if (reinterpret_cast<Cancelable*>(p->value)->Cancel()) {
to_remove.insert(reinterpret_cast<Cancelable*>(p->value)->id());
}
}
// Remove tasks that were successfully canceled.
for (auto id : to_remove) {
cancelable_tasks_.Remove(reinterpret_cast<void*>(id), id);
}
to_remove.clear();
// Finally, wait for already running background tasks.
if (cancelable_tasks_.occupancy() > 0) {
cancelable_tasks_barrier_.Wait(&mutex_);
}
}
}
CancelableTask::CancelableTask(Isolate* isolate)
: Cancelable(isolate->cancelable_task_manager()), isolate_(isolate) {}
CancelableIdleTask::CancelableIdleTask(Isolate* isolate)
: Cancelable(isolate->cancelable_task_manager()), isolate_(isolate) {}
} // namespace internal
} // namespace v8
......@@ -6,26 +6,108 @@
#define V8_CANCELABLE_TASK_H_
#include "include/v8-platform.h"
#include "src/atomic-utils.h"
#include "src/base/macros.h"
#include "src/base/platform/condition-variable.h"
#include "src/hashmap.h"
namespace v8 {
namespace internal {
class Cancelable;
class Isolate;
// Keeps track of cancelable tasks. It is possible to register and remove tasks
// from any fore- and background task/thread.
class CancelableTaskManager {
public:
CancelableTaskManager();
// Registers a new cancelable {task}. Returns the unique {id} of the task that
// can be used to try to abort a task by calling {Abort}.
uint32_t Register(Cancelable* task);
// Try to abort running a task identified by {id}. The possible outcomes are:
// (1) The task is already finished running and thus has been removed from
// the manager.
// (2) The task is currently running and cannot be canceled anymore.
// (3) The task is not yet running (or finished) so it is canceled and
// removed.
//
// Returns {false} for (1) and (2), and {true} for (3).
bool TryAbort(uint32_t id);
// Cancels all remaining registered tasks and waits for tasks that are
// already running.
void CancelAndWait();
private:
// To mitigate the ABA problem, the api refers to tasks through an id.
uint32_t task_id_counter_;
// A set of cancelable tasks that are currently registered.
HashMap cancelable_tasks_;
// Mutex and condition variable enabling concurrent register and removing, as
// well as waiting for background tasks on {CancelAndWait}.
base::ConditionVariable cancelable_tasks_barrier_;
base::Mutex mutex_;
DISALLOW_COPY_AND_ASSIGN(CancelableTaskManager);
};
class Cancelable {
public:
explicit Cancelable(Isolate* isolate);
explicit Cancelable(CancelableTaskManager* parent);
virtual ~Cancelable();
virtual void Cancel() { is_cancelled_ = true; }
// Never invoke after handing over the task to the platform! The reason is
// that {Cancelable} is used in combination with {v8::Task} and handed to
// a platform. This step transfers ownership to the platform, which destroys
// the task after running it. Since the exact time is not known, we cannot
// access the object after handing it to a platform.
uint32_t id() { return id_; }
protected:
Isolate* isolate_;
bool is_cancelled_;
bool TryRun() { return status_.TrySetValue(kWaiting, kRunning); }
bool IsRunning() { return status_.Value() == kRunning; }
intptr_t CancelAttempts() { return cancel_counter_.Value(); }
private:
// Identifies the state a cancelable task is in:
// |kWaiting|: The task is scheduled and waiting to be executed. {TryRun} will
// succeed.
// |kCanceled|: The task has been canceled. {TryRun} will fail.
// |kRunning|: The task is currently running and cannot be canceled anymore.
enum Status {
kWaiting,
kCanceled,
kRunning,
};
// Use {CancelableTaskManager} to abort a task that has not yet been
// executed.
bool Cancel() {
if (status_.TrySetValue(kWaiting, kCanceled)) {
return true;
}
cancel_counter_.Increment(1);
return false;
}
CancelableTaskManager* parent_;
AtomicValue<Status> status_;
uint32_t id_;
// The counter is incremented for failing tries to cancel a task. This can be
// used by the task itself as an indication how often external entities tried
// to abort it.
AtomicNumber<intptr_t> cancel_counter_;
friend class CancelableTaskManager;
DISALLOW_COPY_AND_ASSIGN(Cancelable);
};
......@@ -33,18 +115,21 @@ class Cancelable {
// Multiple inheritance can be used because Task is a pure interface.
class CancelableTask : public Cancelable, public Task {
public:
explicit CancelableTask(Isolate* isolate) : Cancelable(isolate) {}
explicit CancelableTask(Isolate* isolate);
// Task overrides.
void Run() final {
if (!is_cancelled_) {
if (TryRun()) {
RunInternal();
}
}
virtual void RunInternal() = 0;
Isolate* isolate() { return isolate_; }
private:
Isolate* isolate_;
DISALLOW_COPY_AND_ASSIGN(CancelableTask);
};
......@@ -52,18 +137,21 @@ class CancelableTask : public Cancelable, public Task {
// Multiple inheritance can be used because IdleTask is a pure interface.
class CancelableIdleTask : public Cancelable, public IdleTask {
public:
explicit CancelableIdleTask(Isolate* isolate) : Cancelable(isolate) {}
explicit CancelableIdleTask(Isolate* isolate);
// IdleTask overrides.
void Run(double deadline_in_seconds) final {
if (!is_cancelled_) {
if (TryRun()) {
RunInternal(deadline_in_seconds);
}
}
virtual void RunInternal(double deadline_in_seconds) = 0;
Isolate* isolate() { return isolate_; }
private:
Isolate* isolate_;
DISALLOW_COPY_AND_ASSIGN(CancelableIdleTask);
};
......
......@@ -533,10 +533,10 @@ class GlobalHandles::PendingPhantomCallbacksSecondPassTask
}
void RunInternal() override {
isolate_->heap()->CallGCPrologueCallbacks(
isolate()->heap()->CallGCPrologueCallbacks(
GCType::kGCTypeProcessWeakCallbacks, kNoGCCallbackFlags);
InvokeSecondPassPhantomCallbacks(&pending_phantom_callbacks_, isolate_);
isolate_->heap()->CallGCEpilogueCallbacks(
InvokeSecondPassPhantomCallbacks(&pending_phantom_callbacks_, isolate());
isolate()->heap()->CallGCEpilogueCallbacks(
GCType::kGCTypeProcessWeakCallbacks, kNoGCCallbackFlags);
}
......
......@@ -91,7 +91,7 @@ void IncrementalMarkingJob::IdleTask::RunInternal(double deadline_in_seconds) {
double deadline_in_ms =
deadline_in_seconds *
static_cast<double>(base::Time::kMillisecondsPerSecond);
Heap* heap = isolate_->heap();
Heap* heap = isolate()->heap();
double start_ms = heap->MonotonicallyIncreasingTimeInMs();
job_->NotifyIdleTask();
job_->NotifyIdleTaskProgress();
......@@ -102,7 +102,7 @@ void IncrementalMarkingJob::IdleTask::RunInternal(double deadline_in_seconds) {
double current_time_ms = heap->MonotonicallyIncreasingTimeInMs();
double idle_time_in_ms = deadline_in_ms - start_ms;
double deadline_difference = deadline_in_ms - current_time_ms;
PrintIsolate(isolate_, "%8.0f ms: ", isolate_->time_millis_since_init());
PrintIsolate(isolate(), "%8.0f ms: ", isolate()->time_millis_since_init());
PrintF(
"Idle task: requested idle time %.2f ms, used idle time %.2f "
"ms, deadline usage %.2f ms\n",
......@@ -127,7 +127,7 @@ void IncrementalMarkingJob::DelayedTask::Step(Heap* heap) {
void IncrementalMarkingJob::DelayedTask::RunInternal() {
Heap* heap = isolate_->heap();
Heap* heap = isolate()->heap();
job_->NotifyDelayedTask();
IncrementalMarking* incremental_marking = heap->incremental_marking();
if (!incremental_marking->IsStopped()) {
......
......@@ -17,7 +17,7 @@ namespace internal {
const double ScavengeJob::kMaxAllocationLimitAsFractionOfNewSpace = 0.8;
void ScavengeJob::IdleTask::RunInternal(double deadline_in_seconds) {
Heap* heap = isolate_->heap();
Heap* heap = isolate()->heap();
double deadline_in_ms =
deadline_in_seconds *
static_cast<double>(base::Time::kMillisecondsPerSecond);
......
......@@ -1797,6 +1797,7 @@ Isolate::Isolate(bool enable_serializer)
#endif
use_counter_callback_(NULL),
basic_block_profiler_(NULL),
cancelable_task_manager_(new CancelableTaskManager()),
abort_on_uncaught_exception_callback_(NULL) {
{
base::LockGuard<base::Mutex> lock_guard(thread_data_table_mutex_.Pointer());
......@@ -1920,10 +1921,7 @@ void Isolate::Deinit() {
delete basic_block_profiler_;
basic_block_profiler_ = NULL;
for (Cancelable* task : cancelable_tasks_) {
task->Cancel();
}
cancelable_tasks_.clear();
cancelable_task_manager()->CancelAndWait();
heap_.TearDown();
logger_->TearDown();
......@@ -2028,6 +2026,9 @@ Isolate::~Isolate() {
delete debug_;
debug_ = NULL;
delete cancelable_task_manager_;
cancelable_task_manager_ = nullptr;
#if USE_SIMULATOR
Simulator::TearDown(simulator_i_cache_, simulator_redirection_);
simulator_i_cache_ = nullptr;
......@@ -2798,18 +2799,6 @@ void Isolate::CheckDetachedContextsAfterGC() {
}
void Isolate::RegisterCancelableTask(Cancelable* task) {
cancelable_tasks_.insert(task);
}
void Isolate::RemoveCancelableTask(Cancelable* task) {
auto removed = cancelable_tasks_.erase(task);
USE(removed);
DCHECK(removed == 1);
}
bool StackLimitCheck::JsHasOverflowed(uintptr_t gap) const {
StackGuard* stack_guard = isolate_->stack_guard();
#ifdef USE_SIMULATOR
......
......@@ -1093,8 +1093,9 @@ class Isolate {
FutexWaitListNode* futex_wait_list_node() { return &futex_wait_list_node_; }
void RegisterCancelableTask(Cancelable* task);
void RemoveCancelableTask(Cancelable* task);
CancelableTaskManager* cancelable_task_manager() {
return cancelable_task_manager_;
}
interpreter::Interpreter* interpreter() const { return interpreter_; }
......@@ -1338,7 +1339,7 @@ class Isolate {
FutexWaitListNode futex_wait_list_node_;
std::set<Cancelable*> cancelable_tasks_;
CancelableTaskManager* cancelable_task_manager_;
v8::Isolate::AbortOnUncaughtExceptionCallback
abort_on_uncaught_exception_callback_;
......
// Copyright 2015 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/base/atomicops.h"
#include "src/base/platform/platform.h"
#include "src/cancelable-task.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace v8 {
namespace internal {
namespace {
class TestTask : public Task, public Cancelable {
public:
enum Mode { kDoNothing, kWaitTillCanceledAgain, kCheckNotRun };
TestTask(CancelableTaskManager* parent, base::AtomicWord* result,
Mode mode = kDoNothing)
: Cancelable(parent), result_(result), mode_(mode) {}
// Task overrides.
void Run() final {
if (TryRun()) {
RunInternal();
}
}
private:
void RunInternal() {
base::Release_Store(result_, id());
switch (mode_) {
case kWaitTillCanceledAgain:
// Simple busy wait until the main thread tried to cancel.
while (CancelAttempts() == 0) {
}
break;
case kCheckNotRun:
// Check that we never execute {RunInternal}.
EXPECT_TRUE(false);
break;
default:
break;
}
}
base::AtomicWord* result_;
Mode mode_;
};
class SequentialRunner {
public:
explicit SequentialRunner(TestTask* task) : task_(task) {}
void Run() {
task_->Run();
delete task_;
}
private:
TestTask* task_;
};
class ThreadedRunner final : public base::Thread {
public:
explicit ThreadedRunner(TestTask* task)
: Thread(Options("runner thread")), task_(task) {}
virtual void Run() {
task_->Run();
delete task_;
}
private:
TestTask* task_;
};
typedef base::AtomicWord ResultType;
intptr_t GetValue(ResultType* result) { return base::Acquire_Load(result); }
} // namespace
TEST(CancelableTask, EmptyCancelableTaskManager) {
CancelableTaskManager manager;
manager.CancelAndWait();
}
TEST(CancelableTask, SequentialCancelAndWait) {
CancelableTaskManager manager;
ResultType result1 = 0;
SequentialRunner runner1(
new TestTask(&manager, &result1, TestTask::kCheckNotRun));
EXPECT_EQ(GetValue(&result1), 0);
manager.CancelAndWait();
EXPECT_EQ(GetValue(&result1), 0);
runner1.Run(); // Run to avoid leaking the Task.
EXPECT_EQ(GetValue(&result1), 0);
}
TEST(CancelableTask, SequentialMultipleTasks) {
CancelableTaskManager manager;
ResultType result1 = 0;
ResultType result2 = 0;
TestTask* task1 = new TestTask(&manager, &result1);
TestTask* task2 = new TestTask(&manager, &result2);
SequentialRunner runner1(task1);
SequentialRunner runner2(task2);
EXPECT_EQ(task1->id(), 1);
EXPECT_EQ(task2->id(), 2);
EXPECT_EQ(GetValue(&result1), 0);
runner1.Run(); // Don't touch task1 after running it.
EXPECT_EQ(GetValue(&result1), 1);
EXPECT_EQ(GetValue(&result2), 0);
runner2.Run(); // Don't touch task2 after running it.
EXPECT_EQ(GetValue(&result2), 2);
manager.CancelAndWait();
EXPECT_FALSE(manager.TryAbort(1));
EXPECT_FALSE(manager.TryAbort(2));
}
TEST(CancelableTask, ThreadedMultipleTasksStarted) {
CancelableTaskManager manager;
ResultType result1 = 0;
ResultType result2 = 0;
TestTask* task1 =
new TestTask(&manager, &result1, TestTask::kWaitTillCanceledAgain);
TestTask* task2 =
new TestTask(&manager, &result2, TestTask::kWaitTillCanceledAgain);
ThreadedRunner runner1(task1);
ThreadedRunner runner2(task2);
runner1.Start();
runner2.Start();
// Busy wait on result to make sure both tasks are done.
while ((GetValue(&result1) == 0) || (GetValue(&result2) == 0)) {
}
manager.CancelAndWait();
runner1.Join();
runner2.Join();
EXPECT_EQ(GetValue(&result1), 1);
EXPECT_EQ(GetValue(&result2), 2);
}
TEST(CancelableTask, ThreadedMultipleTasksNotRun) {
CancelableTaskManager manager;
ResultType result1 = 0;
ResultType result2 = 0;
TestTask* task1 = new TestTask(&manager, &result1, TestTask::kCheckNotRun);
TestTask* task2 = new TestTask(&manager, &result2, TestTask::kCheckNotRun);
ThreadedRunner runner1(task1);
ThreadedRunner runner2(task2);
manager.CancelAndWait();
// Tasks are canceled, hence the runner will bail out and not update result.
runner1.Start();
runner2.Start();
runner1.Join();
runner2.Join();
EXPECT_EQ(GetValue(&result1), 0);
EXPECT_EQ(GetValue(&result2), 0);
}
TEST(CancelableTask, RemoveBeforeCancelAndWait) {
CancelableTaskManager manager;
ResultType result1 = 0;
TestTask* task1 = new TestTask(&manager, &result1, TestTask::kCheckNotRun);
ThreadedRunner runner1(task1);
uint32_t id = task1->id();
EXPECT_EQ(id, 1);
EXPECT_TRUE(manager.TryAbort(id));
runner1.Start();
runner1.Join();
manager.CancelAndWait();
EXPECT_EQ(GetValue(&result1), 0);
}
TEST(CancelableTask, RemoveAfterCancelAndWait) {
CancelableTaskManager manager;
ResultType result1 = 0;
TestTask* task1 = new TestTask(&manager, &result1);
ThreadedRunner runner1(task1);
uint32_t id = task1->id();
EXPECT_EQ(id, 1);
runner1.Start();
runner1.Join();
manager.CancelAndWait();
EXPECT_FALSE(manager.TryAbort(id));
EXPECT_EQ(GetValue(&result1), 1);
}
TEST(CancelableTask, RemoveUnmanagedId) {
CancelableTaskManager manager;
EXPECT_FALSE(manager.TryAbort(1));
EXPECT_FALSE(manager.TryAbort(2));
manager.CancelAndWait();
EXPECT_FALSE(manager.TryAbort(1));
EXPECT_FALSE(manager.TryAbort(3));
}
} // namespace internal
} // namespace v8
......@@ -42,6 +42,7 @@
'base/platform/time-unittest.cc',
'base/sys-info-unittest.cc',
'base/utils/random-number-generator-unittest.cc',
'cancelable-tasks-unittest.cc',
'char-predicates-unittest.cc',
'compiler/binary-operator-reducer-unittest.cc',
'compiler/branch-elimination-unittest.cc',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment