Commit 1280954d authored by Mircea Trofin's avatar Mircea Trofin Committed by Commit Bot

[wasm] Throttle the amount of unfinished work to avoid OOM

It is possible that the foreground task is unable to clear the
scheduled unfinished work, eventually leading to an OOM.

We use either code_range on 64 bit, or the capacity of the code space,
as a heuristic for how much memory to use for compilation.

Bug: v8:6492, chromium:732010
Change-Id: I1e4c0825351a42fa0b8369ccc41800ac3445563d
Reviewed-on: https://chromium-review.googlesource.com/535017
Commit-Queue: Brad Nelson <bradnelson@chromium.org>
Reviewed-by: 's avatarBrad Nelson <bradnelson@chromium.org>
Cr-Commit-Position: refs/heads/master@{#46017}
parent 631c429f
...@@ -208,6 +208,7 @@ class V8_EXPORT_PRIVATE CompilationJob { ...@@ -208,6 +208,7 @@ class V8_EXPORT_PRIVATE CompilationJob {
State state() const { return state_; } State state() const { return state_; }
CompilationInfo* info() const { return info_; } CompilationInfo* info() const { return info_; }
Isolate* isolate() const; Isolate* isolate() const;
virtual size_t AllocatedMemory() const { return 0; }
protected: protected:
// Overridden by the actual implementation. // Overridden by the actual implementation.
......
...@@ -690,6 +690,8 @@ class PipelineWasmCompilationJob final : public CompilationJob { ...@@ -690,6 +690,8 @@ class PipelineWasmCompilationJob final : public CompilationJob {
Status FinalizeJobImpl() final; Status FinalizeJobImpl() final;
private: private:
size_t AllocatedMemory() const override;
ZoneStats zone_stats_; ZoneStats zone_stats_;
std::unique_ptr<PipelineStatistics> pipeline_statistics_; std::unique_ptr<PipelineStatistics> pipeline_statistics_;
PipelineData data_; PipelineData data_;
...@@ -736,6 +738,10 @@ PipelineWasmCompilationJob::ExecuteJobImpl() { ...@@ -736,6 +738,10 @@ PipelineWasmCompilationJob::ExecuteJobImpl() {
return SUCCEEDED; return SUCCEEDED;
} }
size_t PipelineWasmCompilationJob::AllocatedMemory() const {
return pipeline_.data_->zone_stats()->GetCurrentAllocatedBytes();
}
PipelineWasmCompilationJob::Status PipelineWasmCompilationJob::Status
PipelineWasmCompilationJob::FinalizeJobImpl() { PipelineWasmCompilationJob::FinalizeJobImpl() {
pipeline_.AssembleCode(&linkage_); pipeline_.AssembleCode(&linkage_);
......
...@@ -3975,6 +3975,12 @@ void WasmCompilationUnit::ExecuteCompilation() { ...@@ -3975,6 +3975,12 @@ void WasmCompilationUnit::ExecuteCompilation() {
isolate_->counters()->wasm_compile_function_time()); isolate_->counters()->wasm_compile_function_time());
} }
ExecuteCompilationInternal(); ExecuteCompilationInternal();
// Record the memory cost this unit places on the system until
// it is finalized. That may be "0" in error cases.
if (job_) {
size_t cost = job_->AllocatedMemory();
set_memory_cost(cost);
}
} }
void WasmCompilationUnit::ExecuteCompilationInternal() { void WasmCompilationUnit::ExecuteCompilationInternal() {
......
...@@ -64,6 +64,9 @@ class WasmCompilationUnit final { ...@@ -64,6 +64,9 @@ class WasmCompilationUnit final {
wasm::ModuleBytesEnv* module_env, wasm::ModuleBytesEnv* module_env,
const wasm::WasmFunction* function); const wasm::WasmFunction* function);
void set_memory_cost(size_t memory_cost) { memory_cost_ = memory_cost; }
size_t memory_cost() const { return memory_cost_; }
private: private:
SourcePositionTable* BuildGraphForWasmFunction(double* decode_ms); SourcePositionTable* BuildGraphForWasmFunction(double* decode_ms);
...@@ -85,7 +88,7 @@ class WasmCompilationUnit final { ...@@ -85,7 +88,7 @@ class WasmCompilationUnit final {
int func_index_; int func_index_;
wasm::Result<wasm::DecodeStruct*> graph_construction_result_; wasm::Result<wasm::DecodeStruct*> graph_construction_result_;
bool ok_ = true; bool ok_ = true;
size_t memory_cost_ = 0;
void ExecuteCompilationInternal(); void ExecuteCompilationInternal();
DISALLOW_COPY_AND_ASSIGN(WasmCompilationUnit); DISALLOW_COPY_AND_ASSIGN(WasmCompilationUnit);
......
...@@ -68,11 +68,11 @@ ZoneStats::~ZoneStats() { ...@@ -68,11 +68,11 @@ ZoneStats::~ZoneStats() {
DCHECK(stats_.empty()); DCHECK(stats_.empty());
} }
size_t ZoneStats::GetMaxAllocatedBytes() { size_t ZoneStats::GetMaxAllocatedBytes() const {
return std::max(max_allocated_bytes_, GetCurrentAllocatedBytes()); return std::max(max_allocated_bytes_, GetCurrentAllocatedBytes());
} }
size_t ZoneStats::GetCurrentAllocatedBytes() { size_t ZoneStats::GetCurrentAllocatedBytes() const {
size_t total = 0; size_t total = 0;
for (Zone* zone : zones_) { for (Zone* zone : zones_) {
total += static_cast<size_t>(zone->allocation_size()); total += static_cast<size_t>(zone->allocation_size());
...@@ -80,7 +80,7 @@ size_t ZoneStats::GetCurrentAllocatedBytes() { ...@@ -80,7 +80,7 @@ size_t ZoneStats::GetCurrentAllocatedBytes() {
return total; return total;
} }
size_t ZoneStats::GetTotalAllocatedBytes() { size_t ZoneStats::GetTotalAllocatedBytes() const {
return total_deleted_bytes_ + GetCurrentAllocatedBytes(); return total_deleted_bytes_ + GetCurrentAllocatedBytes();
} }
......
...@@ -66,9 +66,9 @@ class V8_EXPORT_PRIVATE ZoneStats final { ...@@ -66,9 +66,9 @@ class V8_EXPORT_PRIVATE ZoneStats final {
explicit ZoneStats(AccountingAllocator* allocator); explicit ZoneStats(AccountingAllocator* allocator);
~ZoneStats(); ~ZoneStats();
size_t GetMaxAllocatedBytes(); size_t GetMaxAllocatedBytes() const;
size_t GetTotalAllocatedBytes(); size_t GetTotalAllocatedBytes() const;
size_t GetCurrentAllocatedBytes(); size_t GetCurrentAllocatedBytes() const;
private: private:
Zone* NewEmptyZone(const char* zone_name); Zone* NewEmptyZone(const char* zone_name);
......
...@@ -39,14 +39,31 @@ namespace internal { ...@@ -39,14 +39,31 @@ namespace internal {
namespace wasm { namespace wasm {
ModuleCompiler::CodeGenerationSchedule::CodeGenerationSchedule( ModuleCompiler::CodeGenerationSchedule::CodeGenerationSchedule(
base::RandomNumberGenerator* random_number_generator) base::RandomNumberGenerator* random_number_generator, size_t max_memory)
: random_number_generator_(random_number_generator) { : random_number_generator_(random_number_generator),
max_memory_(max_memory) {
DCHECK_NOT_NULL(random_number_generator_); DCHECK_NOT_NULL(random_number_generator_);
DCHECK_GT(max_memory_, 0);
} }
void ModuleCompiler::CodeGenerationSchedule::Schedule( void ModuleCompiler::CodeGenerationSchedule::Schedule(
std::unique_ptr<compiler::WasmCompilationUnit>&& item) { std::unique_ptr<compiler::WasmCompilationUnit>&& item) {
size_t cost = item->memory_cost();
schedule_.push_back(std::move(item)); schedule_.push_back(std::move(item));
allocated_memory_.Increment(cost);
}
void ModuleCompiler::CodeGenerationSchedule::WaitUntilCanWork() {
if (!throttle_ || allocated_memory_.Value() <= max_memory_) return;
base::LockGuard<base::Mutex> guard(&accept_work_mutex_);
while (throttle_ && allocated_memory_.Value() > max_memory_) {
accept_work_.Wait(&accept_work_mutex_);
}
// We waited for the used memory to drop, and maybe other threads were, too.
// Notify another one. See note about threshold being overpassed by more than
// one thread in {FetchAndExecuteCompilationUnit}
accept_work_.NotifyOne();
} }
std::unique_ptr<compiler::WasmCompilationUnit> std::unique_ptr<compiler::WasmCompilationUnit>
...@@ -56,6 +73,12 @@ ModuleCompiler::CodeGenerationSchedule::GetNext() { ...@@ -56,6 +73,12 @@ ModuleCompiler::CodeGenerationSchedule::GetNext() {
auto ret = std::move(schedule_[index]); auto ret = std::move(schedule_[index]);
std::swap(schedule_[schedule_.size() - 1], schedule_[index]); std::swap(schedule_[schedule_.size() - 1], schedule_[index]);
schedule_.pop_back(); schedule_.pop_back();
if (throttle_ &&
allocated_memory_.Decrement(ret->memory_cost()) <= max_memory_) {
// There may be threads waiting. Notify one. In turn, it will notify another
// one. This avoids all waiting the threads contending for the mutex.
accept_work_.NotifyOne();
}
return ret; return ret;
} }
...@@ -73,7 +96,12 @@ ModuleCompiler::ModuleCompiler(Isolate* isolate, ...@@ -73,7 +96,12 @@ ModuleCompiler::ModuleCompiler(Isolate* isolate,
module_(std::move(module)), module_(std::move(module)),
counters_shared_(isolate->counters_shared()), counters_shared_(isolate->counters_shared()),
is_sync_(is_sync), is_sync_(is_sync),
executed_units_(isolate->random_number_generator()) { executed_units_(
isolate->random_number_generator(),
(isolate->heap()->memory_allocator()->code_range()->valid()
? isolate->heap()->memory_allocator()->code_range()->size()
: isolate->heap()->code_space()->Capacity()) /
2) {
counters_ = counters_shared_.get(); counters_ = counters_shared_.get();
} }
...@@ -87,10 +115,32 @@ void ModuleCompiler::CompilationTask::RunInternal() { ...@@ -87,10 +115,32 @@ void ModuleCompiler::CompilationTask::RunInternal() {
compiler_->module_->pending_tasks.get()->Signal(); compiler_->module_->pending_tasks.get()->Signal();
} }
// Run by each compilation task and by the main thread. The void ModuleCompiler::CompileOnMainThread() {
// no_finisher_callback is called within the result_mutex_ lock when no DisallowHeapAllocation no_allocation;
// finishing task is running, i.e. when the finisher_is_running_ flag is not DisallowHandleAllocation no_handles;
// set. DisallowHandleDereference no_deref;
DisallowCodeDependencyChange no_dependency_change;
// - 1 because AtomicIncrement returns the value after the atomic increment.
size_t index = next_unit_.Increment(1) - 1;
if (index >= compilation_units_.size()) {
return;
}
std::unique_ptr<compiler::WasmCompilationUnit> unit =
std::move(compilation_units_.at(index));
unit->ExecuteCompilation();
// Schedule the result, bypassing the threshold.
{
base::LockGuard<base::Mutex> guard(&result_mutex_);
executed_units_.Schedule(std::move(unit));
}
}
// Run by each compilation task The no_finisher_callback is called
// within the result_mutex_ lock when no finishing task is running,
// i.e. when the finisher_is_running_ flag is not set.
bool ModuleCompiler::FetchAndExecuteCompilationUnit( bool ModuleCompiler::FetchAndExecuteCompilationUnit(
std::function<void()> no_finisher_callback) { std::function<void()> no_finisher_callback) {
DisallowHeapAllocation no_allocation; DisallowHeapAllocation no_allocation;
...@@ -99,11 +149,28 @@ bool ModuleCompiler::FetchAndExecuteCompilationUnit( ...@@ -99,11 +149,28 @@ bool ModuleCompiler::FetchAndExecuteCompilationUnit(
DisallowCodeDependencyChange no_dependency_change; DisallowCodeDependencyChange no_dependency_change;
// - 1 because AtomicIncrement returns the value after the atomic increment. // - 1 because AtomicIncrement returns the value after the atomic increment.
// Bail out fast if there's no work to do.
size_t index = next_unit_.Increment(1) - 1; size_t index = next_unit_.Increment(1) - 1;
if (index >= compilation_units_.size()) { if (index >= compilation_units_.size()) {
return false; return false;
} }
// TODO(wasm): this is a no-op for async. Hopefully we can
// avoid it if we unify the parallel and async pipelines.
// The thread observes if the memory threshold wasn't passed.
// It is possible that a few threads make this observation concurrently,
// then only one ends up being sufficient to go over the threshold.
// We don't know ahead of time how much memory will be allocated at compile
// time, and we don't want to span a critical section until we find out -
// since the whole point is for compilation to happen in parallel. So, we
// tolerate some elasticity in the system: it's OK if the threshold is passed
// because a number of units are scheduled like that - the extra amount
// is bound by the number of threads.
// We can lower the threshold if the current margin heuristics proves too
// high.
executed_units_.WaitUntilCanWork();
std::unique_ptr<compiler::WasmCompilationUnit> unit = std::unique_ptr<compiler::WasmCompilationUnit> unit =
std::move(compilation_units_.at(index)); std::move(compilation_units_.at(index));
unit->ExecuteCompilation(); unit->ExecuteCompilation();
...@@ -140,6 +207,8 @@ uint32_t* ModuleCompiler::StartCompilationTasks() { ...@@ -140,6 +207,8 @@ uint32_t* ModuleCompiler::StartCompilationTasks() {
num_background_tasks_ = num_background_tasks_ =
Min(static_cast<size_t>(FLAG_wasm_num_compilation_tasks), Min(static_cast<size_t>(FLAG_wasm_num_compilation_tasks),
V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()); V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads());
executed_units_.EnableThrottling();
uint32_t* task_ids = new uint32_t[num_background_tasks_]; uint32_t* task_ids = new uint32_t[num_background_tasks_];
for (size_t i = 0; i < num_background_tasks_; ++i) { for (size_t i = 0; i < num_background_tasks_; ++i) {
CompilationTask* task = new CompilationTask(this); CompilationTask* task = new CompilationTask(this);
...@@ -161,16 +230,19 @@ void ModuleCompiler::WaitForCompilationTasks(uint32_t* task_ids) { ...@@ -161,16 +230,19 @@ void ModuleCompiler::WaitForCompilationTasks(uint32_t* task_ids) {
} }
} }
void ModuleCompiler::FinishCompilationUnits(std::vector<Handle<Code>>& results, size_t ModuleCompiler::FinishCompilationUnits(
ErrorThrower* thrower) { std::vector<Handle<Code>>& results, ErrorThrower* thrower) {
size_t finished = 0;
SetFinisherIsRunning(true); SetFinisherIsRunning(true);
while (true) { while (true) {
int func_index = -1; int func_index = -1;
Handle<Code> result = FinishCompilationUnit(thrower, &func_index); Handle<Code> result = FinishCompilationUnit(thrower, &func_index);
if (func_index < 0) break; if (func_index < 0) break;
results[func_index] = result; results[func_index] = result;
++finished;
} }
SetFinisherIsRunning(false); SetFinisherIsRunning(false);
return finished;
} }
void ModuleCompiler::SetFinisherIsRunning(bool value) { void ModuleCompiler::SetFinisherIsRunning(bool value) {
...@@ -221,30 +293,30 @@ void ModuleCompiler::CompileInParallel(ModuleBytesEnv* module_env, ...@@ -221,30 +293,30 @@ void ModuleCompiler::CompileInParallel(ModuleBytesEnv* module_env,
// and stores them in the vector {compilation_units}. // and stores them in the vector {compilation_units}.
InitializeParallelCompilation(module->functions, *module_env); InitializeParallelCompilation(module->functions, *module_env);
// Objects for the synchronization with the background threads.
base::AtomicNumber<size_t> next_unit(
static_cast<size_t>(FLAG_skip_compiling_wasm_funcs));
// 2) The main thread spawns {CompilationTask} instances which run on // 2) The main thread spawns {CompilationTask} instances which run on
// the background threads. // the background threads.
std::unique_ptr<uint32_t[]> task_ids(StartCompilationTasks()); std::unique_ptr<uint32_t[]> task_ids(StartCompilationTasks());
size_t finished_functions = 0;
while (finished_functions < compilation_units_.size()) {
// 3.a) The background threads and the main thread pick one compilation // 3.a) The background threads and the main thread pick one compilation
// unit at a time and execute the parallel phase of the compilation // unit at a time and execute the parallel phase of the compilation
// unit. After finishing the execution of the parallel phase, the // unit. After finishing the execution of the parallel phase, the
// result is enqueued in {executed_units}. // result is enqueued in {executed_units}.
while (FetchAndExecuteCompilationUnit()) { // The foreground task bypasses waiting on memory threshold, because
// its results will immediately be converted to code (below).
CompileOnMainThread();
// 3.b) If {executed_units} contains a compilation unit, the main thread // 3.b) If {executed_units} contains a compilation unit, the main thread
// dequeues it and finishes the compilation unit. Compilation units // dequeues it and finishes the compilation unit. Compilation units
// are finished concurrently to the background threads to save // are finished concurrently to the background threads to save
// memory. // memory.
FinishCompilationUnits(results, thrower); finished_functions += FinishCompilationUnits(results, thrower);
} }
// 4) After the parallel phase of all compilation units has started, the // 4) After the parallel phase of all compilation units has started, the
// main thread waits for all {CompilationTask} instances to finish. // main thread waits for all {CompilationTask} instances to finish - which
// happens once they all realize there's no next work item to process.
WaitForCompilationTasks(task_ids.get()); WaitForCompilationTasks(task_ids.get());
// Finish the compilation of the remaining compilation units.
FinishCompilationUnits(results, thrower);
} }
void ModuleCompiler::CompileSequentially(ModuleBytesEnv* module_env, void ModuleCompiler::CompileSequentially(ModuleBytesEnv* module_env,
......
...@@ -42,7 +42,8 @@ class ModuleCompiler { ...@@ -42,7 +42,8 @@ class ModuleCompiler {
class CodeGenerationSchedule { class CodeGenerationSchedule {
public: public:
explicit CodeGenerationSchedule( explicit CodeGenerationSchedule(
base::RandomNumberGenerator* random_number_generator); base::RandomNumberGenerator* random_number_generator,
size_t max_memory = 0);
void Schedule(std::unique_ptr<compiler::WasmCompilationUnit>&& item); void Schedule(std::unique_ptr<compiler::WasmCompilationUnit>&& item);
...@@ -50,11 +51,20 @@ class ModuleCompiler { ...@@ -50,11 +51,20 @@ class ModuleCompiler {
std::unique_ptr<compiler::WasmCompilationUnit> GetNext(); std::unique_ptr<compiler::WasmCompilationUnit> GetNext();
void WaitUntilCanWork();
void EnableThrottling() { throttle_ = true; }
private: private:
size_t GetRandomIndexInSchedule(); size_t GetRandomIndexInSchedule();
base::RandomNumberGenerator* random_number_generator_ = nullptr; base::RandomNumberGenerator* random_number_generator_ = nullptr;
std::vector<std::unique_ptr<compiler::WasmCompilationUnit>> schedule_; std::vector<std::unique_ptr<compiler::WasmCompilationUnit>> schedule_;
base::Mutex accept_work_mutex_;
base::ConditionVariable accept_work_;
const size_t max_memory_;
bool throttle_ = false;
base::AtomicNumber<size_t> allocated_memory_{0};
}; };
Isolate* isolate_; Isolate* isolate_;
...@@ -78,6 +88,8 @@ class ModuleCompiler { ...@@ -78,6 +88,8 @@ class ModuleCompiler {
bool FetchAndExecuteCompilationUnit( bool FetchAndExecuteCompilationUnit(
std::function<void()> no_finisher_callback = [] {}); std::function<void()> no_finisher_callback = [] {});
void CompileOnMainThread();
size_t InitializeParallelCompilation( size_t InitializeParallelCompilation(
const std::vector<WasmFunction>& functions, ModuleBytesEnv& module_env); const std::vector<WasmFunction>& functions, ModuleBytesEnv& module_env);
...@@ -85,7 +97,7 @@ class ModuleCompiler { ...@@ -85,7 +97,7 @@ class ModuleCompiler {
void WaitForCompilationTasks(uint32_t* task_ids); void WaitForCompilationTasks(uint32_t* task_ids);
void FinishCompilationUnits(std::vector<Handle<Code>>& results, size_t FinishCompilationUnits(std::vector<Handle<Code>>& results,
ErrorThrower* thrower); ErrorThrower* thrower);
void SetFinisherIsRunning(bool value); void SetFinisherIsRunning(bool value);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment