Commit 28b0129b authored by binji's avatar binji Committed by Commit bot

Fix cluster-fuzz regression when getting message from Worker

The issue is that Worker.prototype.terminate was deleting the C++ Worker
object, and then Worker.prototype.getMessage was trying to read messages from
the queue.

The simplest solution is to keep workers in a zombie state when they have been
terminated. They won't be reaped until Shell::CleanupWorkers is called.

I've also fixed some threading issues with Workers:

* Workers can be created by another Worker, so the Shell::workers_ variable
must be protected by a mutex.

* An individual Worker can typically only be accessed by the isolate that
created it, but the main thread can always terminate it, so the Worker::state_
must be accessed in a thread-safe way.

BUG=chromium:504136
R=jochen@chromium.org
LOG=n

Review URL: https://codereview.chromium.org/1208733002

Cr-Commit-Position: refs/heads/master@{#29306}
parent 803b0c74
...@@ -205,6 +205,8 @@ base::Mutex Shell::context_mutex_; ...@@ -205,6 +205,8 @@ base::Mutex Shell::context_mutex_;
const base::TimeTicks Shell::kInitialTicks = const base::TimeTicks Shell::kInitialTicks =
base::TimeTicks::HighResolutionNow(); base::TimeTicks::HighResolutionNow();
Persistent<Context> Shell::utility_context_; Persistent<Context> Shell::utility_context_;
base::Mutex Shell::workers_mutex_;
bool Shell::allow_new_workers_ = true;
i::List<Worker*> Shell::workers_; i::List<Worker*> Shell::workers_;
i::List<SharedArrayBuffer::Contents> Shell::externalized_shared_contents_; i::List<SharedArrayBuffer::Contents> Shell::externalized_shared_contents_;
#endif // !V8_SHARED #endif // !V8_SHARED
...@@ -693,12 +695,17 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -693,12 +695,17 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
return; return;
} }
Worker* worker = new Worker; {
args.This()->SetInternalField(0, External::New(isolate, worker)); base::LockGuard<base::Mutex> lock_guard(&workers_mutex_);
workers_.Add(worker); if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.This()->SetInternalField(0, External::New(isolate, worker));
workers_.Add(worker);
String::Utf8Value function_string(args[0]->ToString()); String::Utf8Value function_string(args[0]->ToString());
worker->StartExecuteInThread(isolate, *function_string); worker->StartExecuteInThread(isolate, *function_string);
}
} }
...@@ -793,8 +800,6 @@ void Shell::WorkerTerminate(const v8::FunctionCallbackInfo<v8::Value>& args) { ...@@ -793,8 +800,6 @@ void Shell::WorkerTerminate(const v8::FunctionCallbackInfo<v8::Value>& args) {
Worker* worker = Worker* worker =
static_cast<Worker*>(Local<External>::Cast(this_value)->Value()); static_cast<Worker*>(Local<External>::Cast(this_value)->Value());
worker->Terminate(); worker->Terminate();
workers_.RemoveElement(worker);
delete worker;
} }
#endif // !V8_SHARED #endif // !V8_SHARED
...@@ -1620,7 +1625,11 @@ void SerializationDataQueue::Clear() { ...@@ -1620,7 +1625,11 @@ void SerializationDataQueue::Clear() {
Worker::Worker() Worker::Worker()
: in_semaphore_(0), out_semaphore_(0), thread_(NULL), script_(NULL) {} : in_semaphore_(0),
out_semaphore_(0),
thread_(NULL),
script_(NULL),
state_(IDLE) {}
Worker::~Worker() { Cleanup(); } Worker::~Worker() { Cleanup(); }
...@@ -1628,11 +1637,7 @@ Worker::~Worker() { Cleanup(); } ...@@ -1628,11 +1637,7 @@ Worker::~Worker() { Cleanup(); }
void Worker::StartExecuteInThread(Isolate* isolate, void Worker::StartExecuteInThread(Isolate* isolate,
const char* function_string) { const char* function_string) {
if (thread_) { DCHECK(base::NoBarrier_Load(&state_) == IDLE);
Throw(isolate, "Only one worker allowed");
return;
}
static const char format[] = "(%s).call(this);"; static const char format[] = "(%s).call(this);";
size_t len = strlen(function_string) + sizeof(format); size_t len = strlen(function_string) + sizeof(format);
...@@ -1640,6 +1645,7 @@ void Worker::StartExecuteInThread(Isolate* isolate, ...@@ -1640,6 +1645,7 @@ void Worker::StartExecuteInThread(Isolate* isolate,
i::Vector<char> vec(script_, static_cast<int>(len + 1)); i::Vector<char> vec(script_, static_cast<int>(len + 1));
i::SNPrintF(vec, format, function_string); i::SNPrintF(vec, format, function_string);
base::NoBarrier_Store(&state_, RUNNING);
thread_ = new WorkerThread(this); thread_ = new WorkerThread(this);
thread_->Start(); thread_->Start();
} }
...@@ -1652,8 +1658,9 @@ void Worker::PostMessage(SerializationData* data) { ...@@ -1652,8 +1658,9 @@ void Worker::PostMessage(SerializationData* data) {
SerializationData* Worker::GetMessage() { SerializationData* Worker::GetMessage() {
SerializationData* data; SerializationData* data = NULL;
while (!out_queue_.Dequeue(&data)) { while (!out_queue_.Dequeue(&data)) {
if (base::NoBarrier_Load(&state_) != RUNNING) break;
out_semaphore_.Wait(); out_semaphore_.Wait();
} }
...@@ -1662,10 +1669,11 @@ SerializationData* Worker::GetMessage() { ...@@ -1662,10 +1669,11 @@ SerializationData* Worker::GetMessage() {
void Worker::Terminate() { void Worker::Terminate() {
if (thread_ == NULL) return; if (base::NoBarrier_CompareAndSwap(&state_, RUNNING, TERMINATED) == RUNNING) {
PostMessage(NULL); // Post NULL to wake the Worker thread message loop.
thread_->Join(); PostMessage(NULL);
Cleanup(); thread_->Join();
}
} }
...@@ -1698,31 +1706,31 @@ void Worker::ExecuteInThread() { ...@@ -1698,31 +1706,31 @@ void Worker::ExecuteInThread() {
// First run the script // First run the script
Handle<String> file_name = String::NewFromUtf8(isolate, "unnamed"); Handle<String> file_name = String::NewFromUtf8(isolate, "unnamed");
Handle<String> source = String::NewFromUtf8(isolate, script_); Handle<String> source = String::NewFromUtf8(isolate, script_);
Shell::ExecuteString(isolate, source, file_name, false, true); if (Shell::ExecuteString(isolate, source, file_name, true, true)) {
// Get the message handler
// Get the message handler Handle<Value> onmessage =
Handle<Value> onmessage = global->Get(String::NewFromUtf8(isolate, "onmessage"));
global->Get(String::NewFromUtf8(isolate, "onmessage")); if (onmessage->IsFunction()) {
if (onmessage->IsFunction()) { Handle<Function> onmessage_fun = Handle<Function>::Cast(onmessage);
Handle<Function> onmessage_fun = Handle<Function>::Cast(onmessage); // Now wait for messages
// Now wait for messages bool done = false;
bool done = false; while (!done) {
while (!done) { in_semaphore_.Wait();
in_semaphore_.Wait(); SerializationData* data;
SerializationData* data; if (!in_queue_.Dequeue(&data)) continue;
if (!in_queue_.Dequeue(&data)) continue; if (data == NULL) {
if (data == NULL) { done = true;
done = true; break;
break; }
int offset = 0;
Local<Value> data_value;
if (Shell::DeserializeValue(isolate, *data, &offset)
.ToLocal(&data_value)) {
Handle<Value> argv[] = {data_value};
(void)onmessage_fun->Call(context, global, 1, argv);
}
delete data;
} }
int offset = 0;
Local<Value> data_value;
if (Shell::DeserializeValue(isolate, *data, &offset)
.ToLocal(&data_value)) {
Handle<Value> argv[] = {data_value};
(void)onmessage_fun->Call(context, global, 1, argv);
}
delete data;
} }
} }
} }
...@@ -1730,6 +1738,12 @@ void Worker::ExecuteInThread() { ...@@ -1730,6 +1738,12 @@ void Worker::ExecuteInThread() {
Shell::CollectGarbage(isolate); Shell::CollectGarbage(isolate);
} }
isolate->Dispose(); isolate->Dispose();
if (base::NoBarrier_CompareAndSwap(&state_, RUNNING, TERMINATED) == RUNNING) {
// Post NULL to wake the thread waiting on GetMessage() if there is one.
out_queue_.Enqueue(NULL);
out_semaphore_.Signal();
}
} }
...@@ -2191,12 +2205,28 @@ MaybeLocal<Value> Shell::DeserializeValue(Isolate* isolate, ...@@ -2191,12 +2205,28 @@ MaybeLocal<Value> Shell::DeserializeValue(Isolate* isolate,
void Shell::CleanupWorkers() { void Shell::CleanupWorkers() {
for (int i = 0; i < workers_.length(); ++i) { // Make a copy of workers_, because we don't want to call Worker::Terminate
Worker* worker = workers_[i]; // while holding the workers_mutex_ lock. Otherwise, if a worker is about to
// create a new Worker, it would deadlock.
i::List<Worker*> workers_copy;
{
base::LockGuard<base::Mutex> lock_guard(&workers_mutex_);
allow_new_workers_ = false;
workers_copy.AddAll(workers_);
workers_.Clear();
}
for (int i = 0; i < workers_copy.length(); ++i) {
Worker* worker = workers_copy[i];
worker->Terminate(); worker->Terminate();
delete worker; delete worker;
} }
workers_.Clear();
// Now that all workers are terminated, we can re-enable Worker creation.
{
base::LockGuard<base::Mutex> lock_guard(&workers_mutex_);
allow_new_workers_ = true;
}
for (int i = 0; i < externalized_shared_contents_.length(); ++i) { for (int i = 0; i < externalized_shared_contents_.length(); ++i) {
const SharedArrayBuffer::Contents& contents = const SharedArrayBuffer::Contents& contents =
......
...@@ -257,6 +257,8 @@ class Worker { ...@@ -257,6 +257,8 @@ class Worker {
Worker* worker_; Worker* worker_;
}; };
enum State { IDLE, RUNNING, TERMINATED };
void ExecuteInThread(); void ExecuteInThread();
void Cleanup(); void Cleanup();
static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args); static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args);
...@@ -267,6 +269,7 @@ class Worker { ...@@ -267,6 +269,7 @@ class Worker {
SerializationDataQueue out_queue_; SerializationDataQueue out_queue_;
base::Thread* thread_; base::Thread* thread_;
char* script_; char* script_;
base::Atomic32 state_;
}; };
#endif // !V8_SHARED #endif // !V8_SHARED
...@@ -461,6 +464,9 @@ class Shell : public i::AllStatic { ...@@ -461,6 +464,9 @@ class Shell : public i::AllStatic {
static base::OS::MemoryMappedFile* counters_file_; static base::OS::MemoryMappedFile* counters_file_;
static base::Mutex context_mutex_; static base::Mutex context_mutex_;
static const base::TimeTicks kInitialTicks; static const base::TimeTicks kInitialTicks;
static base::Mutex workers_mutex_;
static bool allow_new_workers_;
static i::List<Worker*> workers_; static i::List<Worker*> workers_;
static i::List<SharedArrayBuffer::Contents> externalized_shared_contents_; static i::List<SharedArrayBuffer::Contents> externalized_shared_contents_;
......
// Copyright 2015 the V8 project authors. All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided
// with the distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
if (this.Worker) {
function f() {
var g = function () {
postMessage(42);
};
var w = new Worker(g);
onmessage = function(parentMsg) {
w.postMessage(parentMsg);
var childMsg = w.getMessage();
postMessage(childMsg);
};
}
var w = new Worker(f);
w.postMessage(9);
assertEquals(42, w.getMessage());
}
...@@ -128,4 +128,12 @@ if (this.Worker) { ...@@ -128,4 +128,12 @@ if (this.Worker) {
assertEquals("DONE", w.getMessage()); assertEquals("DONE", w.getMessage());
w.terminate(); w.terminate();
// Make sure that the main thread doesn't block forever in getMessage() if
// the worker dies without posting a message.
function f2() {}
var w2 = new Worker(f2);
var msg = w2.getMessage();
assertEquals(undefined, msg);
} }
// Copyright 2015 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
if (this.Worker) {
function __f_5() {}
var __v_10 = new Worker(__f_5);
__v_10.terminate();
__v_10.getMessage();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment