Commit 62c9ad5b authored by Maya Lekova's avatar Maya Lekova Committed by Commit Bot

Revert "[d8] Cleanup message queues"

This reverts commit 26dad80f.

Reason for revert: Breaks d8, see https://ci.chromium.org/p/v8/builders/ci/V8%20Win32/22272

Original change's description:
> [d8] Cleanup message queues
> 
> Simplifies some of the logic of message queues in d8 and makes sure
> to delete any in-flight messages upon worker termination. Drive-by
> cleanups of some other small d8 vestiges.
> 
> R=​clemensh@chromium.org
> BUG=v8:9524
> 
> Change-Id: I587c0cb3eeed88107e7dba552389057f07c15c43
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1710673
> Commit-Queue: Ben Titzer <titzer@chromium.org>
> Reviewed-by: Clemens Hammacher <clemensh@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#62873}

TBR=titzer@chromium.org,clemensh@chromium.org

Change-Id: Ibc15d9fb76698a2bad51e3842392634fb2f0246b
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: v8:9524
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1714877Reviewed-by: 's avatarMaya Lekova <mslekova@chromium.org>
Commit-Queue: Maya Lekova <mslekova@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62875}
parent c0943a50
......@@ -1406,16 +1406,16 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
String::Utf8Value script(args.GetIsolate(), source);
if (!*script) {
Throw(args.GetIsolate(), "Can't get worker script");
return;
}
Worker* worker = new Worker(*script);
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
worker->StartExecuteInThread();
worker->StartExecuteInThread(*script);
}
}
......@@ -2441,7 +2441,7 @@ bool SourceGroup::Execute(Isolate* isolate) {
Local<String> file_name =
String::NewFromUtf8(isolate, arg, NewStringType::kNormal)
.ToLocalChecked();
Local<String> source = Shell::ReadFile(isolate, arg);
Local<String> source = ReadFile(isolate, arg);
if (source.IsEmpty()) {
printf("Error reading '%s'\n", arg);
base::OS::ExitProcess(1);
......@@ -2457,6 +2457,10 @@ bool SourceGroup::Execute(Isolate* isolate) {
return success;
}
Local<String> SourceGroup::ReadFile(Isolate* isolate, const char* name) {
return Shell::ReadFile(isolate, name);
}
SourceGroup::IsolateThread::IsolateThread(SourceGroup* group)
: base::Thread(GetThreadOptions("IsolateThread")), group_(group) {}
......@@ -2523,56 +2527,73 @@ ExternalizedContents::~ExternalizedContents() {
void SerializationDataQueue::Enqueue(std::unique_ptr<SerializationData> data) {
base::MutexGuard lock_guard(&mutex_);
if (!live_) return;
data_.emplace_back(std::move(data));
not_empty_.NotifyOne();
data_.push_back(std::move(data));
}
std::unique_ptr<SerializationData> SerializationDataQueue::Dequeue() {
bool SerializationDataQueue::Dequeue(
std::unique_ptr<SerializationData>* out_data) {
out_data->reset();
base::MutexGuard lock_guard(&mutex_);
while (live_ && data_.empty()) not_empty_.Wait(&mutex_);
if (!live_) return {};
auto result = std::move(data_.front());
data_.pop_front();
return result;
if (data_.empty()) return false;
*out_data = std::move(data_[0]);
data_.erase(data_.begin());
return true;
}
void SerializationDataQueue::Kill() {
bool SerializationDataQueue::IsEmpty() {
base::MutexGuard lock_guard(&mutex_);
live_ = false;
data_.clear();
not_empty_.NotifyAll();
return data_.empty();
}
Worker::Worker(const char* script)
: thread_(nullptr), script_(nullptr), running_(false) {
script_ = i::StrDup(script);
void SerializationDataQueue::Clear() {
base::MutexGuard lock_guard(&mutex_);
data_.clear();
}
Worker::Worker()
: in_semaphore_(0),
out_semaphore_(0),
thread_(nullptr),
script_(nullptr),
running_(false) {}
Worker::~Worker() {
delete thread_;
thread_ = nullptr;
delete[] script_;
script_ = nullptr;
in_queue_.Clear();
out_queue_.Clear();
}
void Worker::StartExecuteInThread() {
void Worker::StartExecuteInThread(const char* script) {
running_ = true;
script_ = i::StrDup(script);
thread_ = new WorkerThread(this);
thread_->Start();
}
void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
in_queue_.Enqueue(std::move(data));
in_semaphore_.Signal();
}
std::unique_ptr<SerializationData> Worker::GetMessage() {
return out_queue_.Dequeue();
std::unique_ptr<SerializationData> result;
while (!out_queue_.Dequeue(&result)) {
// If the worker is no longer running, and there are no messages in the
// queue, don't expect any more messages from it.
if (!base::Relaxed_Load(&running_)) break;
out_semaphore_.Wait();
}
return result;
}
void Worker::Terminate() {
in_queue_.Kill();
out_queue_.Kill();
base::Relaxed_Store(&running_, false);
// Post nullptr to wake the Worker thread message loop, and tell it to stop
// running.
PostMessage(nullptr);
}
void Worker::WaitForThread() {
......@@ -2638,8 +2659,12 @@ void Worker::ExecuteInThread() {
Local<Function> onmessage_fun = Local<Function>::Cast(onmessage);
// Now wait for messages
while (true) {
std::unique_ptr<SerializationData> data = in_queue_.Dequeue();
if (!data) break; // nullptr indicates termination.
in_semaphore_.Wait();
std::unique_ptr<SerializationData> data;
if (!in_queue_.Dequeue(&data)) continue;
if (!data) {
break;
}
v8::TryCatch try_catch(isolate);
Local<Value> value;
if (Shell::DeserializeValue(isolate, std::move(data))
......@@ -2661,8 +2686,10 @@ void Worker::ExecuteInThread() {
Shell::CollectGarbage(isolate);
}
isolate->Dispose();
in_queue_.Kill();
out_queue_.Kill();
// Post nullptr to wake the thread waiting on GetMessage() if there is one.
out_queue_.Enqueue(nullptr);
out_semaphore_.Signal();
}
void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) {
......@@ -2683,6 +2710,7 @@ void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) {
Local<External> this_value = Local<External>::Cast(args.Data());
Worker* worker = static_cast<Worker*>(this_value->Value());
worker->out_queue_.Enqueue(std::move(data));
worker->out_semaphore_.Signal();
}
}
......
......@@ -14,7 +14,6 @@
#include <vector>
#include "src/base/once.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/time.h"
#include "src/d8/async-hooks-wrapper.h"
#include "src/strings/string-hasher.h"
......@@ -103,6 +102,9 @@ class SourceGroup {
base::Semaphore done_semaphore_;
base::Thread* thread_;
void ExitShell(int exit_code);
Local<String> ReadFile(Isolate* isolate, const char* name);
const char** argv_;
int begin_offset_;
int end_offset_;
......@@ -193,31 +195,24 @@ class SerializationData {
class SerializationDataQueue {
public:
// Enqueue the given data.
void Enqueue(std::unique_ptr<SerializationData> data);
// Wait for data to arrive. Returns {nullptr} if this queue is {Kill()'d}
// in the course of waiting.
std::unique_ptr<SerializationData> Dequeue();
// Kill this queue, canceling any pending dequeue requests.
void Kill();
bool Dequeue(std::unique_ptr<SerializationData>* data);
bool IsEmpty();
void Clear();
private:
base::Mutex mutex_;
bool live_ = true;
std::deque<std::unique_ptr<SerializationData>> data_;
base::ConditionVariable not_empty_;
std::vector<std::unique_ptr<SerializationData>> data_;
};
class Worker {
public:
explicit Worker(const char* script);
Worker();
~Worker();
// Run this Worker. This function should only be called once, and should only
// be called by the thread that created the Worker.
void StartExecuteInThread();
// Run the given script on this Worker. This function should only be called
// once, and should only be called by the thread that created the Worker.
void StartExecuteInThread(const char* script);
// Post a message to the worker's incoming message queue. The worker will
// take ownership of the SerializationData.
// This function should only be called by the thread that created the Worker.
......@@ -252,10 +247,12 @@ class Worker {
void ExecuteInThread();
static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args);
base::Semaphore in_semaphore_;
base::Semaphore out_semaphore_;
SerializationDataQueue in_queue_;
SerializationDataQueue out_queue_;
WorkerThread* thread_;
const char* script_;
base::Thread* thread_;
char* script_;
base::Atomic32 running_;
};
......
// Copyright 2015 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided
// with the distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Verify that the Worker constrcutor by default treats its first argument
// as the filename of a script load and run.
// Resources: test/mjsunit/d8/d8-worker-script.txt
if (this.Worker) {
// Verify that the Worker constructor by default treats its first argument
// as the filename of a script load and run.
var w = new Worker('test/mjsunit/d8/d8-worker-script.txt');
assertEquals("Starting worker", w.getMessage());
w.postMessage("");
assertEquals("DONE", w.getMessage());
w.terminate();
try {
var w = new Worker('test/mjsunit/d8/not-found.txt');
assertFalse(true);
} catch (e) {
// should not be able to find this script.
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment