Commit 26dad80f authored by Ben L. Titzer's avatar Ben L. Titzer Committed by Commit Bot

[d8] Cleanup message queues

Simplifies some of the logic of message queues in d8 and makes sure
to delete any in-flight messages upon worker termination. Drive-by
cleanups of some other small d8 vestiges.

R=clemensh@chromium.org
BUG=v8:9524

Change-Id: I587c0cb3eeed88107e7dba552389057f07c15c43
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1710673
Commit-Queue: Ben Titzer <titzer@chromium.org>
Reviewed-by: 's avatarClemens Hammacher <clemensh@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62873}
parent 02c81cbe
...@@ -1406,16 +1406,16 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -1406,16 +1406,16 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
if (!allow_new_workers_) return; if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
String::Utf8Value script(args.GetIsolate(), source); String::Utf8Value script(args.GetIsolate(), source);
if (!*script) { if (!*script) {
Throw(args.GetIsolate(), "Can't get worker script"); Throw(args.GetIsolate(), "Can't get worker script");
return; return;
} }
worker->StartExecuteInThread(*script);
Worker* worker = new Worker(*script);
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
worker->StartExecuteInThread();
} }
} }
...@@ -2441,7 +2441,7 @@ bool SourceGroup::Execute(Isolate* isolate) { ...@@ -2441,7 +2441,7 @@ bool SourceGroup::Execute(Isolate* isolate) {
Local<String> file_name = Local<String> file_name =
String::NewFromUtf8(isolate, arg, NewStringType::kNormal) String::NewFromUtf8(isolate, arg, NewStringType::kNormal)
.ToLocalChecked(); .ToLocalChecked();
Local<String> source = ReadFile(isolate, arg); Local<String> source = Shell::ReadFile(isolate, arg);
if (source.IsEmpty()) { if (source.IsEmpty()) {
printf("Error reading '%s'\n", arg); printf("Error reading '%s'\n", arg);
base::OS::ExitProcess(1); base::OS::ExitProcess(1);
...@@ -2457,10 +2457,6 @@ bool SourceGroup::Execute(Isolate* isolate) { ...@@ -2457,10 +2457,6 @@ bool SourceGroup::Execute(Isolate* isolate) {
return success; return success;
} }
Local<String> SourceGroup::ReadFile(Isolate* isolate, const char* name) {
return Shell::ReadFile(isolate, name);
}
SourceGroup::IsolateThread::IsolateThread(SourceGroup* group) SourceGroup::IsolateThread::IsolateThread(SourceGroup* group)
: base::Thread(GetThreadOptions("IsolateThread")), group_(group) {} : base::Thread(GetThreadOptions("IsolateThread")), group_(group) {}
...@@ -2527,73 +2523,56 @@ ExternalizedContents::~ExternalizedContents() { ...@@ -2527,73 +2523,56 @@ ExternalizedContents::~ExternalizedContents() {
void SerializationDataQueue::Enqueue(std::unique_ptr<SerializationData> data) { void SerializationDataQueue::Enqueue(std::unique_ptr<SerializationData> data) {
base::MutexGuard lock_guard(&mutex_); base::MutexGuard lock_guard(&mutex_);
data_.push_back(std::move(data)); if (!live_) return;
} data_.emplace_back(std::move(data));
not_empty_.NotifyOne();
bool SerializationDataQueue::Dequeue(
std::unique_ptr<SerializationData>* out_data) {
out_data->reset();
base::MutexGuard lock_guard(&mutex_);
if (data_.empty()) return false;
*out_data = std::move(data_[0]);
data_.erase(data_.begin());
return true;
} }
bool SerializationDataQueue::IsEmpty() { std::unique_ptr<SerializationData> SerializationDataQueue::Dequeue() {
base::MutexGuard lock_guard(&mutex_); base::MutexGuard lock_guard(&mutex_);
return data_.empty(); while (live_ && data_.empty()) not_empty_.Wait(&mutex_);
if (!live_) return {};
auto result = std::move(data_.front());
data_.pop_front();
return result;
} }
void SerializationDataQueue::Clear() { void SerializationDataQueue::Kill() {
base::MutexGuard lock_guard(&mutex_); base::MutexGuard lock_guard(&mutex_);
live_ = false;
data_.clear(); data_.clear();
not_empty_.NotifyAll();
} }
Worker::Worker() Worker::Worker(const char* script)
: in_semaphore_(0), : thread_(nullptr), script_(nullptr), running_(false) {
out_semaphore_(0), script_ = i::StrDup(script);
thread_(nullptr), }
script_(nullptr),
running_(false) {}
Worker::~Worker() { Worker::~Worker() {
delete thread_; delete thread_;
thread_ = nullptr; thread_ = nullptr;
delete[] script_; delete[] script_;
script_ = nullptr; script_ = nullptr;
in_queue_.Clear();
out_queue_.Clear();
} }
void Worker::StartExecuteInThread(const char* script) { void Worker::StartExecuteInThread() {
running_ = true; running_ = true;
script_ = i::StrDup(script);
thread_ = new WorkerThread(this); thread_ = new WorkerThread(this);
thread_->Start(); thread_->Start();
} }
void Worker::PostMessage(std::unique_ptr<SerializationData> data) { void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
in_queue_.Enqueue(std::move(data)); in_queue_.Enqueue(std::move(data));
in_semaphore_.Signal();
} }
std::unique_ptr<SerializationData> Worker::GetMessage() { std::unique_ptr<SerializationData> Worker::GetMessage() {
std::unique_ptr<SerializationData> result; return out_queue_.Dequeue();
while (!out_queue_.Dequeue(&result)) {
// If the worker is no longer running, and there are no messages in the
// queue, don't expect any more messages from it.
if (!base::Relaxed_Load(&running_)) break;
out_semaphore_.Wait();
}
return result;
} }
void Worker::Terminate() { void Worker::Terminate() {
base::Relaxed_Store(&running_, false); in_queue_.Kill();
// Post nullptr to wake the Worker thread message loop, and tell it to stop out_queue_.Kill();
// running.
PostMessage(nullptr);
} }
void Worker::WaitForThread() { void Worker::WaitForThread() {
...@@ -2659,12 +2638,8 @@ void Worker::ExecuteInThread() { ...@@ -2659,12 +2638,8 @@ void Worker::ExecuteInThread() {
Local<Function> onmessage_fun = Local<Function>::Cast(onmessage); Local<Function> onmessage_fun = Local<Function>::Cast(onmessage);
// Now wait for messages // Now wait for messages
while (true) { while (true) {
in_semaphore_.Wait(); std::unique_ptr<SerializationData> data = in_queue_.Dequeue();
std::unique_ptr<SerializationData> data; if (!data) break; // nullptr indicates termination.
if (!in_queue_.Dequeue(&data)) continue;
if (!data) {
break;
}
v8::TryCatch try_catch(isolate); v8::TryCatch try_catch(isolate);
Local<Value> value; Local<Value> value;
if (Shell::DeserializeValue(isolate, std::move(data)) if (Shell::DeserializeValue(isolate, std::move(data))
...@@ -2686,10 +2661,8 @@ void Worker::ExecuteInThread() { ...@@ -2686,10 +2661,8 @@ void Worker::ExecuteInThread() {
Shell::CollectGarbage(isolate); Shell::CollectGarbage(isolate);
} }
isolate->Dispose(); isolate->Dispose();
in_queue_.Kill();
// Post nullptr to wake the thread waiting on GetMessage() if there is one. out_queue_.Kill();
out_queue_.Enqueue(nullptr);
out_semaphore_.Signal();
} }
void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) { void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) {
...@@ -2710,7 +2683,6 @@ void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -2710,7 +2683,6 @@ void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) {
Local<External> this_value = Local<External>::Cast(args.Data()); Local<External> this_value = Local<External>::Cast(args.Data());
Worker* worker = static_cast<Worker*>(this_value->Value()); Worker* worker = static_cast<Worker*>(this_value->Value());
worker->out_queue_.Enqueue(std::move(data)); worker->out_queue_.Enqueue(std::move(data));
worker->out_semaphore_.Signal();
} }
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <vector> #include <vector>
#include "src/base/once.h" #include "src/base/once.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/time.h" #include "src/base/platform/time.h"
#include "src/d8/async-hooks-wrapper.h" #include "src/d8/async-hooks-wrapper.h"
#include "src/strings/string-hasher.h" #include "src/strings/string-hasher.h"
...@@ -102,9 +103,6 @@ class SourceGroup { ...@@ -102,9 +103,6 @@ class SourceGroup {
base::Semaphore done_semaphore_; base::Semaphore done_semaphore_;
base::Thread* thread_; base::Thread* thread_;
void ExitShell(int exit_code);
Local<String> ReadFile(Isolate* isolate, const char* name);
const char** argv_; const char** argv_;
int begin_offset_; int begin_offset_;
int end_offset_; int end_offset_;
...@@ -195,24 +193,31 @@ class SerializationData { ...@@ -195,24 +193,31 @@ class SerializationData {
class SerializationDataQueue { class SerializationDataQueue {
public: public:
// Enqueue the given data.
void Enqueue(std::unique_ptr<SerializationData> data); void Enqueue(std::unique_ptr<SerializationData> data);
bool Dequeue(std::unique_ptr<SerializationData>* data);
bool IsEmpty(); // Wait for data to arrive. Returns {nullptr} if this queue is {Kill()'d}
void Clear(); // in the course of waiting.
std::unique_ptr<SerializationData> Dequeue();
// Kill this queue, canceling any pending dequeue requests.
void Kill();
private: private:
base::Mutex mutex_; base::Mutex mutex_;
std::vector<std::unique_ptr<SerializationData>> data_; bool live_ = true;
std::deque<std::unique_ptr<SerializationData>> data_;
base::ConditionVariable not_empty_;
}; };
class Worker { class Worker {
public: public:
Worker(); explicit Worker(const char* script);
~Worker(); ~Worker();
// Run the given script on this Worker. This function should only be called // Run this Worker. This function should only be called once, and should only
// once, and should only be called by the thread that created the Worker. // be called by the thread that created the Worker.
void StartExecuteInThread(const char* script); void StartExecuteInThread();
// Post a message to the worker's incoming message queue. The worker will // Post a message to the worker's incoming message queue. The worker will
// take ownership of the SerializationData. // take ownership of the SerializationData.
// This function should only be called by the thread that created the Worker. // This function should only be called by the thread that created the Worker.
...@@ -247,12 +252,10 @@ class Worker { ...@@ -247,12 +252,10 @@ class Worker {
void ExecuteInThread(); void ExecuteInThread();
static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args); static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args);
base::Semaphore in_semaphore_;
base::Semaphore out_semaphore_;
SerializationDataQueue in_queue_; SerializationDataQueue in_queue_;
SerializationDataQueue out_queue_; SerializationDataQueue out_queue_;
base::Thread* thread_; WorkerThread* thread_;
char* script_; const char* script_;
base::Atomic32 running_; base::Atomic32 running_;
}; };
......
// Copyright 2015 the V8 project authors. All rights reserved. // Copyright 2015 the V8 project authors. All rights reserved.
// Redistribution and use in source and binary forms, with or without // Use of this source code is governed by a BSD-style license that can be
// modification, are permitted provided that the following conditions are // found in the LICENSE file.
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided
// with the distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Verify that the Worker constrcutor by default treats its first argument
// as the filename of a script load and run.
// Resources: test/mjsunit/d8/d8-worker-script.txt // Resources: test/mjsunit/d8/d8-worker-script.txt
if (this.Worker) { if (this.Worker) {
// Verify that the Worker constructor by default treats its first argument
// as the filename of a script load and run.
var w = new Worker('test/mjsunit/d8/d8-worker-script.txt'); var w = new Worker('test/mjsunit/d8/d8-worker-script.txt');
assertEquals("Starting worker", w.getMessage()); assertEquals("Starting worker", w.getMessage());
w.postMessage(""); w.postMessage("");
assertEquals("DONE", w.getMessage()); assertEquals("DONE", w.getMessage());
w.terminate(); w.terminate();
try {
var w = new Worker('test/mjsunit/d8/not-found.txt');
assertFalse(true);
} catch (e) {
// should not be able to find this script.
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment