Commit 5a9722b2 authored by binji's avatar binji Committed by Commit bot

d8 workers: Fix transferring SharedArrayBuffer to multiple Workers. (try 2)

Note: the previous try was reverted for occasional flaky tests. This continued
after the revert, and should be fixed by
https://codereview.chromium.org/1226143003.

Previously, the serialization code would call Externalize for every transferred
ArrayBuffer or SharedArrayBuffer, but that function can only be called once. If
the buffer is already externalized, we should call GetContents instead.

Also fix use-after-free bug when transferring ArrayBuffers. The transferred
ArrayBuffer must be internalized in the new isolate, or be managed by the
Shell. The current code gives it to the isolate externalized and frees it
immediately afterward when the SerializationData object is destroyed.

BUG=chromium:497295
R=jarin@chromium.org
LOG=n

Review URL: https://codereview.chromium.org/1223813008

Cr-Commit-Position: refs/heads/master@{#29658}
parent 8735d38e
...@@ -1537,22 +1537,26 @@ void SourceGroup::WaitForThread() { ...@@ -1537,22 +1537,26 @@ void SourceGroup::WaitForThread() {
SerializationData::~SerializationData() { SerializationData::~SerializationData() {
// Any ArrayBuffer::Contents are owned by this SerializationData object. // Any ArrayBuffer::Contents are owned by this SerializationData object if
// SharedArrayBuffer::Contents may be used by other threads, so must be // ownership hasn't been transferred out via ReadArrayBufferContents.
// SharedArrayBuffer::Contents may be used by multiple threads, so must be
// cleaned up by the main thread in Shell::CleanupWorkers(). // cleaned up by the main thread in Shell::CleanupWorkers().
for (int i = 0; i < array_buffer_contents.length(); ++i) { for (int i = 0; i < array_buffer_contents_.length(); ++i) {
ArrayBuffer::Contents& contents = array_buffer_contents[i]; ArrayBuffer::Contents& contents = array_buffer_contents_[i];
Shell::array_buffer_allocator->Free(contents.Data(), contents.ByteLength()); if (contents.Data()) {
Shell::array_buffer_allocator->Free(contents.Data(),
contents.ByteLength());
}
} }
} }
void SerializationData::WriteTag(SerializationTag tag) { data.Add(tag); } void SerializationData::WriteTag(SerializationTag tag) { data_.Add(tag); }
void SerializationData::WriteMemory(const void* p, int length) { void SerializationData::WriteMemory(const void* p, int length) {
if (length > 0) { if (length > 0) {
i::Vector<uint8_t> block = data.AddBlock(0, length); i::Vector<uint8_t> block = data_.AddBlock(0, length);
memcpy(&block[0], p, length); memcpy(&block[0], p, length);
} }
} }
...@@ -1560,18 +1564,18 @@ void SerializationData::WriteMemory(const void* p, int length) { ...@@ -1560,18 +1564,18 @@ void SerializationData::WriteMemory(const void* p, int length) {
void SerializationData::WriteArrayBufferContents( void SerializationData::WriteArrayBufferContents(
const ArrayBuffer::Contents& contents) { const ArrayBuffer::Contents& contents) {
array_buffer_contents.Add(contents); array_buffer_contents_.Add(contents);
WriteTag(kSerializationTagTransferredArrayBuffer); WriteTag(kSerializationTagTransferredArrayBuffer);
int index = array_buffer_contents.length() - 1; int index = array_buffer_contents_.length() - 1;
Write(index); Write(index);
} }
void SerializationData::WriteSharedArrayBufferContents( void SerializationData::WriteSharedArrayBufferContents(
const SharedArrayBuffer::Contents& contents) { const SharedArrayBuffer::Contents& contents) {
shared_array_buffer_contents.Add(contents); shared_array_buffer_contents_.Add(contents);
WriteTag(kSerializationTagTransferredSharedArrayBuffer); WriteTag(kSerializationTagTransferredSharedArrayBuffer);
int index = shared_array_buffer_contents.length() - 1; int index = shared_array_buffer_contents_.length() - 1;
Write(index); Write(index);
} }
...@@ -1583,7 +1587,7 @@ SerializationTag SerializationData::ReadTag(int* offset) const { ...@@ -1583,7 +1587,7 @@ SerializationTag SerializationData::ReadTag(int* offset) const {
void SerializationData::ReadMemory(void* p, int length, int* offset) const { void SerializationData::ReadMemory(void* p, int length, int* offset) const {
if (length > 0) { if (length > 0) {
memcpy(p, &data[*offset], length); memcpy(p, &data_[*offset], length);
(*offset) += length; (*offset) += length;
} }
} }
...@@ -1592,16 +1596,20 @@ void SerializationData::ReadMemory(void* p, int length, int* offset) const { ...@@ -1592,16 +1596,20 @@ void SerializationData::ReadMemory(void* p, int length, int* offset) const {
void SerializationData::ReadArrayBufferContents(ArrayBuffer::Contents* contents, void SerializationData::ReadArrayBufferContents(ArrayBuffer::Contents* contents,
int* offset) const { int* offset) const {
int index = Read<int>(offset); int index = Read<int>(offset);
DCHECK(index < array_buffer_contents.length()); DCHECK(index < array_buffer_contents_.length());
*contents = array_buffer_contents[index]; *contents = array_buffer_contents_[index];
// Ownership of this ArrayBuffer::Contents is passed to the caller. Neuter
// our copy so it won't be double-free'd when this SerializationData is
// destroyed.
array_buffer_contents_[index] = ArrayBuffer::Contents();
} }
void SerializationData::ReadSharedArrayBufferContents( void SerializationData::ReadSharedArrayBufferContents(
SharedArrayBuffer::Contents* contents, int* offset) const { SharedArrayBuffer::Contents* contents, int* offset) const {
int index = Read<int>(offset); int index = Read<int>(offset);
DCHECK(index < shared_array_buffer_contents.length()); DCHECK(index < shared_array_buffer_contents_.length());
*contents = shared_array_buffer_contents[index]; *contents = shared_array_buffer_contents_[index];
} }
...@@ -1644,7 +1652,14 @@ Worker::Worker() ...@@ -1644,7 +1652,14 @@ Worker::Worker()
join_called_(false) {} join_called_(false) {}
Worker::~Worker() { Cleanup(); } Worker::~Worker() {
delete thread_;
thread_ = NULL;
delete[] script_;
script_ = NULL;
in_queue_.Clear();
out_queue_.Clear();
}
void Worker::StartExecuteInThread(Isolate* isolate, const char* script) { void Worker::StartExecuteInThread(Isolate* isolate, const char* script) {
...@@ -1673,7 +1688,6 @@ SerializationData* Worker::GetMessage() { ...@@ -1673,7 +1688,6 @@ SerializationData* Worker::GetMessage() {
if (base::NoBarrier_Load(&state_) != RUNNING) break; if (base::NoBarrier_Load(&state_) != RUNNING) break;
out_semaphore_.Wait(); out_semaphore_.Wait();
} }
return data; return data;
} }
...@@ -1765,16 +1779,6 @@ void Worker::ExecuteInThread() { ...@@ -1765,16 +1779,6 @@ void Worker::ExecuteInThread() {
} }
void Worker::Cleanup() {
delete thread_;
thread_ = NULL;
delete[] script_;
script_ = NULL;
in_queue_.Clear();
out_queue_.Clear();
}
void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) { void Worker::PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args) {
Isolate* isolate = args.GetIsolate(); Isolate* isolate = args.GetIsolate();
HandleScope handle_scope(isolate); HandleScope handle_scope(isolate);
...@@ -2059,7 +2063,9 @@ bool Shell::SerializeValue(Isolate* isolate, Handle<Value> value, ...@@ -2059,7 +2063,9 @@ bool Shell::SerializeValue(Isolate* isolate, Handle<Value> value,
return false; return false;
} }
ArrayBuffer::Contents contents = array_buffer->Externalize(); ArrayBuffer::Contents contents = array_buffer->IsExternal()
? array_buffer->GetContents()
: array_buffer->Externalize();
array_buffer->Neuter(); array_buffer->Neuter();
out_data->WriteArrayBufferContents(contents); out_data->WriteArrayBufferContents(contents);
} else { } else {
...@@ -2088,9 +2094,14 @@ bool Shell::SerializeValue(Isolate* isolate, Handle<Value> value, ...@@ -2088,9 +2094,14 @@ bool Shell::SerializeValue(Isolate* isolate, Handle<Value> value,
return false; return false;
} }
SharedArrayBuffer::Contents contents = sab->Externalize(); SharedArrayBuffer::Contents contents;
if (sab->IsExternal()) {
contents = sab->GetContents();
} else {
contents = sab->Externalize();
externalized_shared_contents_.Add(contents);
}
out_data->WriteSharedArrayBufferContents(contents); out_data->WriteSharedArrayBufferContents(contents);
externalized_shared_contents_.Add(contents);
} else if (value->IsObject()) { } else if (value->IsObject()) {
Handle<Object> object = Handle<Object>::Cast(value); Handle<Object> object = Handle<Object>::Cast(value);
if (FindInObjectList(object, *seen_objects)) { if (FindInObjectList(object, *seen_objects)) {
...@@ -2203,8 +2214,8 @@ MaybeLocal<Value> Shell::DeserializeValue(Isolate* isolate, ...@@ -2203,8 +2214,8 @@ MaybeLocal<Value> Shell::DeserializeValue(Isolate* isolate,
case kSerializationTagTransferredArrayBuffer: { case kSerializationTagTransferredArrayBuffer: {
ArrayBuffer::Contents contents; ArrayBuffer::Contents contents;
data.ReadArrayBufferContents(&contents, offset); data.ReadArrayBufferContents(&contents, offset);
result = result = ArrayBuffer::New(isolate, contents.Data(), contents.ByteLength(),
ArrayBuffer::New(isolate, contents.Data(), contents.ByteLength()); ArrayBufferCreationMode::kInternalized);
break; break;
} }
case kSerializationTagTransferredSharedArrayBuffer: { case kSerializationTagTransferredSharedArrayBuffer: {
......
...@@ -214,9 +214,9 @@ class SerializationData { ...@@ -214,9 +214,9 @@ class SerializationData {
} }
private: private:
i::List<uint8_t> data; i::List<uint8_t> data_;
i::List<ArrayBuffer::Contents> array_buffer_contents; i::List<ArrayBuffer::Contents> array_buffer_contents_;
i::List<SharedArrayBuffer::Contents> shared_array_buffer_contents; i::List<SharedArrayBuffer::Contents> shared_array_buffer_contents_;
}; };
...@@ -273,7 +273,6 @@ class Worker { ...@@ -273,7 +273,6 @@ class Worker {
}; };
void ExecuteInThread(); void ExecuteInThread();
void Cleanup();
static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args); static void PostMessageOut(const v8::FunctionCallbackInfo<v8::Value>& args);
base::Semaphore in_semaphore_; base::Semaphore in_semaphore_;
......
...@@ -27,42 +27,80 @@ ...@@ -27,42 +27,80 @@
// Flags: --harmony-sharedarraybuffer --harmony-atomics // Flags: --harmony-sharedarraybuffer --harmony-atomics
var workerScript =
`onmessage = function(m) {
var sab = m;
var ta = new Uint32Array(sab);
if (sab.byteLength !== 16) {
throw new Error('SharedArrayBuffer transfer byteLength');
}
for (var i = 0; i < 4; ++i) {
if (ta[i] !== i) {
throw new Error('SharedArrayBuffer transfer value ' + i);
}
}
// Atomically update ta[0]
Atomics.store(ta, 0, 100);
};`;
if (this.Worker) { if (this.Worker) {
var w = new Worker(workerScript);
var sab = new SharedArrayBuffer(16); (function TestTransfer() {
var ta = new Uint32Array(sab); var workerScript =
for (var i = 0; i < 4; ++i) { `onmessage = function(m) {
ta[i] = i; var sab = m;
} var ta = new Uint32Array(sab);
if (sab.byteLength !== 16) {
throw new Error('SharedArrayBuffer transfer byteLength');
}
for (var i = 0; i < 4; ++i) {
if (ta[i] !== i) {
throw new Error('SharedArrayBuffer transfer value ' + i);
}
}
// Atomically update ta[0]
Atomics.store(ta, 0, 100);
};`;
var w = new Worker(workerScript);
var sab = new SharedArrayBuffer(16);
var ta = new Uint32Array(sab);
for (var i = 0; i < 4; ++i) {
ta[i] = i;
}
// Transfer SharedArrayBuffer
w.postMessage(sab, [sab]);
assertEquals(16, sab.byteLength); // ArrayBuffer should not be neutered.
// Spinwait for the worker to update ta[0]
var ta0;
while ((ta0 = Atomics.load(ta, 0)) == 0) {}
assertEquals(100, ta0);
w.terminate();
assertEquals(16, sab.byteLength); // Still not neutered.
})();
// Transfer SharedArrayBuffer (function TestTransferMulti() {
w.postMessage(sab, [sab]); var workerScript =
assertEquals(16, sab.byteLength); // ArrayBuffer should not be neutered. `onmessage = function(msg) {
var sab = msg.sab;
var id = msg.id;
var ta = new Uint32Array(sab);
Atomics.store(ta, id, 1);
postMessage(id);
};`;
// Spinwait for the worker to update ta[0] var sab = new SharedArrayBuffer(16);
var ta0; var ta = new Uint32Array(sab);
while ((ta0 = Atomics.load(ta, 0)) == 0) {}
assertEquals(100, ta0); var id;
var workers = [];
for (id = 0; id < 4; ++id) {
workers[id] = new Worker(workerScript);
workers[id].postMessage({sab: sab, id: id}, [sab]);
}
w.terminate(); // Spinwait for each worker to update ta[id]
var count = 0;
while (count < 4) {
for (id = 0; id < 4; ++id) {
if (Atomics.compareExchange(ta, id, 1, -1) == 1) {
// Worker is finished.
assertEquals(id, workers[id].getMessage());
workers[id].terminate();
count++;
}
}
}
})();
assertEquals(16, sab.byteLength); // Still not neutered.
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment