Commit 602de389 authored by Leszek Swirski's avatar Leszek Swirski Committed by V8 LUCI CQ

[parser] Allow cloning chunked streams

This allows streamed sources to also trigger parallel compile tasks. The
chunk vectors are shared via std::shared_ptr.

Clone chunked streams are initialised with a null source, and are not
allowed to fetch any more data. Similarly, the original stream is not
allowed to fetch data if it has been cloned (since the vector is shared
and would mutate if we added more data to it).

This is ok for the purposes of cloning for parallel compile tasks, as we
fully parse before cloning for the task.

Change-Id: Ic268e4956e0894acb63111bf0aaf32eaad426066
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3310917
Commit-Queue: Leszek Swirski <leszeks@chromium.org>
Reviewed-by: 's avatarToon Verwaest <verwaest@chromium.org>
Cr-Commit-Position: refs/heads/main@{#78209}
parent 05b241c6
......@@ -153,52 +153,47 @@ template <typename Char>
class ChunkedStream {
public:
explicit ChunkedStream(ScriptCompiler::ExternalSourceStream* source)
: source_(source) {}
: source_(source), chunks_(std::make_shared<std::vector<Chunk>>()) {}
ChunkedStream(const ChunkedStream&) V8_NOEXCEPT {
// TODO(rmcilroy): Implement cloning for chunked streams.
UNREACHABLE();
}
ChunkedStream(const ChunkedStream& other) V8_NOEXCEPT
: source_(nullptr),
chunks_(other.chunks_) {}
// The no_gc argument is only here because of the templated way this class
// is used along with other implementations that require V8 heap access.
Range<Char> GetDataAt(size_t pos, RuntimeCallStats* stats,
DisallowGarbageCollection* no_gc = nullptr) {
Chunk chunk = FindChunk(pos, stats);
Chunk& chunk = FindChunk(pos, stats);
size_t buffer_end = chunk.length;
size_t buffer_pos = std::min(buffer_end, pos - chunk.position);
return {&chunk.data[buffer_pos], &chunk.data[buffer_end]};
}
~ChunkedStream() {
for (Chunk& chunk : chunks_) delete[] chunk.data;
return {&chunk.data.get()[buffer_pos], &chunk.data.get()[buffer_end]};
}
static const bool kCanBeCloned = false;
static const bool kCanBeCloned = true;
static const bool kCanAccessHeap = false;
private:
struct Chunk {
Chunk(const Char* const data, size_t position, size_t length)
: data(data), position(position), length(length) {}
const Char* const data;
std::unique_ptr<const Char[]> data;
// The logical position of data.
const size_t position;
const size_t length;
size_t end_position() const { return position + length; }
};
Chunk FindChunk(size_t position, RuntimeCallStats* stats) {
while (V8_UNLIKELY(chunks_.empty())) FetchChunk(size_t{0}, stats);
Chunk& FindChunk(size_t position, RuntimeCallStats* stats) {
while (V8_UNLIKELY(chunks_->empty())) FetchChunk(size_t{0}, stats);
// Walk forwards while the position is in front of the current chunk.
while (position >= chunks_.back().end_position() &&
chunks_.back().length > 0) {
FetchChunk(chunks_.back().end_position(), stats);
while (position >= chunks_->back().end_position() &&
chunks_->back().length > 0) {
FetchChunk(chunks_->back().end_position(), stats);
}
// Walk backwards.
for (auto reverse_it = chunks_.rbegin(); reverse_it != chunks_.rend();
for (auto reverse_it = chunks_->rbegin(); reverse_it != chunks_->rend();
++reverse_it) {
if (reverse_it->position <= position) return *reverse_it;
}
......@@ -210,11 +205,15 @@ class ChunkedStream {
size_t length) {
// Incoming data has to be aligned to Char size.
DCHECK_EQ(0, length % sizeof(Char));
chunks_.emplace_back(reinterpret_cast<const Char*>(data), position,
chunks_->emplace_back(reinterpret_cast<const Char*>(data), position,
length / sizeof(Char));
}
void FetchChunk(size_t position, RuntimeCallStats* stats) {
// Cloned ChunkedStreams have a null source, and therefore can't fetch any
// new data.
DCHECK_NOT_NULL(source_);
const uint8_t* data = nullptr;
size_t length;
{
......@@ -227,7 +226,7 @@ class ChunkedStream {
ScriptCompiler::ExternalSourceStream* source_;
protected:
std::vector<struct Chunk> chunks_;
std::shared_ptr<std::vector<struct Chunk>> chunks_;
};
// Provides a buffered utf-16 view on the bytes from the underlying ByteStream.
......@@ -522,18 +521,17 @@ class Utf8ExternalStreamingStream final : public BufferedUtf16CharacterStream {
public:
Utf8ExternalStreamingStream(
ScriptCompiler::ExternalSourceStream* source_stream)
: current_({0, {0, 0, 0, unibrow::Utf8::State::kAccept}}),
: chunks_(std::make_shared<std::vector<Chunk>>()),
current_({0, {0, 0, 0, unibrow::Utf8::State::kAccept}}),
source_stream_(source_stream) {}
~Utf8ExternalStreamingStream() final {
for (const Chunk& chunk : chunks_) delete[] chunk.data;
}
~Utf8ExternalStreamingStream() final = default;
bool can_access_heap() const final { return false; }
bool can_be_cloned() const final { return false; }
bool can_be_cloned() const final { return true; }
std::unique_ptr<Utf16CharacterStream> Clone() const override {
UNREACHABLE();
return std::make_unique<Utf8ExternalStreamingStream>(*this);
}
protected:
......@@ -563,11 +561,20 @@ class Utf8ExternalStreamingStream final : public BufferedUtf16CharacterStream {
// - The chunk data (data pointer and length), and
// - the position at the first byte of the chunk.
struct Chunk {
const uint8_t* data;
Chunk(const uint8_t* data, size_t length, StreamPosition start)
: data(data), length(length), start(start) {}
std::unique_ptr<const uint8_t[]> data;
size_t length;
StreamPosition start;
};
friend std::unique_ptr<Utf8ExternalStreamingStream> std::make_unique<
Utf8ExternalStreamingStream>(const Utf8ExternalStreamingStream&);
Utf8ExternalStreamingStream(const Utf8ExternalStreamingStream& source_stream)
V8_NOEXCEPT : chunks_(source_stream.chunks_),
current_({0, {0, 0, 0, unibrow::Utf8::State::kAccept}}),
source_stream_(nullptr) {}
// Within the current chunk, skip forward from current_ towards position.
bool SkipToPosition(size_t position);
// Within the current chunk, fill the buffer_ (while it has capacity).
......@@ -578,7 +585,9 @@ class Utf8ExternalStreamingStream final : public BufferedUtf16CharacterStream {
// (This call is potentially expensive.)
void SearchPosition(size_t position);
std::vector<Chunk> chunks_;
Chunk& GetChunk(size_t chunk_no) { return (*chunks_)[chunk_no]; }
std::shared_ptr<std::vector<Chunk>> chunks_;
Position current_;
ScriptCompiler::ExternalSourceStream* source_stream_;
};
......@@ -589,14 +598,14 @@ bool Utf8ExternalStreamingStream::SkipToPosition(size_t position) {
// Already there? Then return immediately.
if (current_.pos.chars == position) return true;
const Chunk& chunk = chunks_[current_.chunk_no];
const Chunk& chunk = GetChunk(current_.chunk_no);
DCHECK(current_.pos.bytes >= chunk.start.bytes);
unibrow::Utf8::State state = chunk.start.state;
uint32_t incomplete_char = chunk.start.incomplete_char;
size_t it = current_.pos.bytes - chunk.start.bytes;
const uint8_t* cursor = &chunk.data[it];
const uint8_t* end = &chunk.data[chunk.length];
const uint8_t* cursor = &chunk.data.get()[it];
const uint8_t* end = &chunk.data.get()[chunk.length];
size_t chars = current_.pos.chars;
......@@ -622,7 +631,7 @@ bool Utf8ExternalStreamingStream::SkipToPosition(size_t position) {
}
}
current_.pos.bytes = chunk.start.bytes + (cursor - chunk.data);
current_.pos.bytes = chunk.start.bytes + (cursor - chunk.data.get());
current_.pos.chars = chars;
current_.pos.incomplete_char = incomplete_char;
current_.pos.state = state;
......@@ -632,11 +641,11 @@ bool Utf8ExternalStreamingStream::SkipToPosition(size_t position) {
}
void Utf8ExternalStreamingStream::FillBufferFromCurrentChunk() {
DCHECK_LT(current_.chunk_no, chunks_.size());
DCHECK_LT(current_.chunk_no, chunks_->size());
DCHECK_EQ(buffer_start_, buffer_cursor_);
DCHECK_LT(buffer_end_ + 1, buffer_start_ + kBufferSize);
const Chunk& chunk = chunks_[current_.chunk_no];
const Chunk& chunk = GetChunk(current_.chunk_no);
// The buffer_ is writable, but buffer_*_ members are const. So we get a
// non-const pointer into buffer that points to the same char as buffer_end_.
......@@ -662,8 +671,8 @@ void Utf8ExternalStreamingStream::FillBufferFromCurrentChunk() {
}
size_t it = current_.pos.bytes - chunk.start.bytes;
const uint8_t* cursor = chunk.data + it;
const uint8_t* end = chunk.data + chunk.length;
const uint8_t* cursor = chunk.data.get() + it;
const uint8_t* end = chunk.data.get() + chunk.length;
// Deal with possible BOM.
if (V8_UNLIKELY(current_.pos.bytes < 3 && current_.pos.chars == 0)) {
......@@ -711,7 +720,7 @@ void Utf8ExternalStreamingStream::FillBufferFromCurrentChunk() {
output_cursor += ascii_length;
}
current_.pos.bytes = chunk.start.bytes + (cursor - chunk.data);
current_.pos.bytes = chunk.start.bytes + (cursor - chunk.data.get());
current_.pos.chars += (output_cursor - buffer_end_);
current_.pos.incomplete_char = incomplete_char;
current_.pos.state = state;
......@@ -722,12 +731,20 @@ void Utf8ExternalStreamingStream::FillBufferFromCurrentChunk() {
bool Utf8ExternalStreamingStream::FetchChunk() {
RCS_SCOPE(runtime_call_stats(), RuntimeCallCounterId::kGetMoreDataCallback);
DCHECK_EQ(current_.chunk_no, chunks_.size());
DCHECK(chunks_.empty() || chunks_.back().length != 0);
DCHECK_EQ(current_.chunk_no, chunks_->size());
DCHECK(chunks_->empty() || chunks_->back().length != 0);
// Clone Utf8ExternalStreamingStreams have a null source stream, and
// therefore can't fetch any new data.
DCHECK_NOT_NULL(source_stream_);
// Utf8ExternalStreamingStreams that have been cloned are not allowed to fetch
// any more.
DCHECK_EQ(chunks_.use_count(), 1);
const uint8_t* chunk = nullptr;
size_t length = source_stream_->GetMoreData(&chunk);
chunks_.push_back({chunk, length, current_.pos});
chunks_->emplace_back(chunk, length, current_.pos);
return length > 0;
}
......@@ -738,8 +755,8 @@ void Utf8ExternalStreamingStream::SearchPosition(size_t position) {
// FillBuffer right after the current buffer.
if (current_.pos.chars == position) return;
// No chunks. Fetch at least one, so we can assume !chunks_.empty() below.
if (chunks_.empty()) {
// No chunks. Fetch at least one, so we can assume !chunks_->empty() below.
if (chunks_->empty()) {
DCHECK_EQ(current_.chunk_no, 0u);
DCHECK_EQ(current_.pos.bytes, 0u);
DCHECK_EQ(current_.pos.chars, 0u);
......@@ -748,38 +765,39 @@ void Utf8ExternalStreamingStream::SearchPosition(size_t position) {
// Search for the last chunk whose start position is less or equal to
// position.
size_t chunk_no = chunks_.size() - 1;
while (chunk_no > 0 && chunks_[chunk_no].start.chars > position) {
size_t chunk_no = chunks_->size() - 1;
while (chunk_no > 0 && GetChunk(chunk_no).start.chars > position) {
chunk_no--;
}
// Did we find the terminating (zero-length) chunk? Then we're seeking
// behind the end of the data, and position does not exist.
// Set current_ to point to the terminating chunk.
if (chunks_[chunk_no].length == 0) {
current_ = {chunk_no, chunks_[chunk_no].start};
if (GetChunk(chunk_no).length == 0) {
current_ = {chunk_no, GetChunk(chunk_no).start};
return;
}
// Did we find the non-last chunk? Then our position must be within chunk_no.
if (chunk_no + 1 < chunks_.size()) {
if (chunk_no + 1 < chunks_->size()) {
// Fancy-pants optimization for ASCII chunks within a utf-8 stream.
// (Many web sites declare utf-8 encoding, but use only (or almost only) the
// ASCII subset for their JavaScript sources. We can exploit this, by
// checking whether the # bytes in a chunk are equal to the # chars, and if
// so avoid the expensive SkipToPosition.)
bool ascii_only_chunk =
chunks_[chunk_no].start.incomplete_char == 0 &&
(chunks_[chunk_no + 1].start.bytes - chunks_[chunk_no].start.bytes) ==
(chunks_[chunk_no + 1].start.chars - chunks_[chunk_no].start.chars);
GetChunk(chunk_no).start.incomplete_char == 0 &&
(GetChunk(chunk_no + 1).start.bytes - GetChunk(chunk_no).start.bytes) ==
(GetChunk(chunk_no + 1).start.chars -
GetChunk(chunk_no).start.chars);
if (ascii_only_chunk) {
size_t skip = position - chunks_[chunk_no].start.chars;
size_t skip = position - GetChunk(chunk_no).start.chars;
current_ = {chunk_no,
{chunks_[chunk_no].start.bytes + skip,
chunks_[chunk_no].start.chars + skip, 0,
{GetChunk(chunk_no).start.bytes + skip,
GetChunk(chunk_no).start.chars + skip, 0,
unibrow::Utf8::State::kAccept}};
} else {
current_ = {chunk_no, chunks_[chunk_no].start};
current_ = {chunk_no, GetChunk(chunk_no).start};
SkipToPosition(position);
}
......@@ -792,12 +810,12 @@ void Utf8ExternalStreamingStream::SearchPosition(size_t position) {
// What's left: We're in the last, non-terminating chunk. Our position
// may be in the chunk, but it may also be in 'future' chunks, which we'll
// have to obtain.
DCHECK_EQ(chunk_no, chunks_.size() - 1);
current_ = {chunk_no, chunks_[chunk_no].start};
DCHECK_EQ(chunk_no, chunks_->size() - 1);
current_ = {chunk_no, GetChunk(chunk_no).start};
bool have_more_data = true;
bool found = SkipToPosition(position);
while (have_more_data && !found) {
DCHECK_EQ(current_.chunk_no, chunks_.size());
DCHECK_EQ(current_.chunk_no, chunks_->size());
have_more_data = FetchChunk();
found = have_more_data && SkipToPosition(position);
}
......@@ -805,9 +823,9 @@ void Utf8ExternalStreamingStream::SearchPosition(size_t position) {
// We'll return with a postion != the desired position only if we're out
// of data. In that case, we'll point to the terminating chunk.
DCHECK_EQ(found, current_.pos.chars == position);
DCHECK_EQ(have_more_data, chunks_.back().length != 0);
DCHECK_EQ(have_more_data, chunks_->back().length != 0);
DCHECK_IMPLIES(!found, !have_more_data);
DCHECK_IMPLIES(!found, current_.chunk_no == chunks_.size() - 1);
DCHECK_IMPLIES(!found, current_.chunk_no == chunks_->size() - 1);
}
size_t Utf8ExternalStreamingStream::FillBuffer(size_t position) {
......@@ -815,8 +833,8 @@ size_t Utf8ExternalStreamingStream::FillBuffer(size_t position) {
buffer_end_ = buffer_;
SearchPosition(position);
bool out_of_data = current_.chunk_no != chunks_.size() &&
chunks_[current_.chunk_no].length == 0 &&
bool out_of_data = current_.chunk_no != chunks_->size() &&
GetChunk(current_.chunk_no).length == 0 &&
current_.pos.incomplete_char == 0;
if (out_of_data) return 0;
......@@ -826,7 +844,7 @@ size_t Utf8ExternalStreamingStream::FillBuffer(size_t position) {
// can't guarantee progress with one chunk. Thus we iterate.)
while (!out_of_data && buffer_cursor_ == buffer_end_) {
// At end of current data, but there might be more? Then fetch it.
if (current_.chunk_no == chunks_.size()) {
if (current_.chunk_no == chunks_->size()) {
out_of_data = !FetchChunk();
}
FillBufferFromCurrentChunk();
......
......@@ -15,10 +15,12 @@ namespace {
// This will take each string as one chunk. The last chunk must be empty.
class ChunkSource : public v8::ScriptCompiler::ExternalSourceStream {
public:
explicit ChunkSource(const char** chunks) : current_(0) {
template <typename Char>
explicit ChunkSource(const Char** chunks) : current_(0) {
do {
chunks_.push_back(
{reinterpret_cast<const uint8_t*>(*chunks), strlen(*chunks)});
chunks_.push_back({reinterpret_cast<const uint8_t*>(*chunks),
(sizeof(Char) / sizeof(uint8_t)) *
std::char_traits<Char>::length(*chunks)});
chunks++;
} while (chunks_.back().len > 0);
}
......@@ -249,7 +251,7 @@ TEST(Utf8SplitBOM) {
TEST(Utf8SplitMultiBOM) {
// Construct chunks with a split BOM followed by another split BOM.
const char* chunks = "\xef\xbb\0\xbf\xef\xbb\0\xbf\0\0";
const char* chunks[] = {"\xef\xbb", "\xbf\xef\xbb", "\xbf", ""};
ChunkSource chunk_source(chunks);
std::unique_ptr<i::Utf16CharacterStream> stream(
v8::internal::ScannerStream::For(
......@@ -463,21 +465,26 @@ void TestCharacterStream(const char* reference, i::Utf16CharacterStream* stream,
void TestCloneCharacterStream(const char* reference,
i::Utf16CharacterStream* stream,
unsigned length) {
std::unique_ptr<i::Utf16CharacterStream> clone = stream->Clone();
// Test original stream through to the end.
TestCharacterStream(reference, stream, length, 0, length);
unsigned i;
unsigned halfway = length / 2;
// Advance original half way.
for (i = 0; i < halfway; i++) {
CHECK_EQU(i, stream->pos());
CHECK_EQU(reference[i], stream->Advance());
}
// Clone the stream after it completes.
std::unique_ptr<i::Utf16CharacterStream> clone = stream->Clone();
// Test advancing original stream didn't affect the clone.
// Test that the clone through to the end.
TestCharacterStream(reference, clone.get(), length, 0, length);
// Test advancing clone didn't affect original stream.
TestCharacterStream(reference, stream, length, i, length);
// Rewind original stream to a third.
stream->Seek(length / 3);
// Rewind clone stream to two thirds.
clone->Seek(2 * length / 3);
// Test seeking clone didn't affect original stream.
TestCharacterStream(reference, stream, length, length / 3, length);
// Test seeking original stream didn't affect clone.
TestCharacterStream(reference, clone.get(), length, 2 * length / 3, length);
}
#undef CHECK_EQU
......@@ -879,7 +886,7 @@ TEST(CloneCharacterStreams) {
->SetResource(isolate, nullptr);
}
// Relocatinable streams aren't clonable.
// Relocatable streams are't clonable.
{
std::unique_ptr<i::Utf16CharacterStream> string_stream(
i::ScannerStream::For(isolate, one_byte_string, 0, length));
......@@ -892,23 +899,31 @@ TEST(CloneCharacterStreams) {
CHECK(!two_byte_string_stream->can_be_cloned());
}
// Chunk sources currently not cloneable.
// Chunk sources are cloneable.
{
const char* chunks[] = {"1234", "\0"};
const char* chunks[] = {"1234", "5678", ""};
ChunkSource chunk_source(chunks);
std::unique_ptr<i::Utf16CharacterStream> one_byte_streaming_stream(
i::ScannerStream::For(&chunk_source,
v8::ScriptCompiler::StreamedSource::ONE_BYTE));
CHECK(!one_byte_streaming_stream->can_be_cloned());
TestCloneCharacterStream("12345678", one_byte_streaming_stream.get(), 8);
}
{
const char* chunks[] = {"1234", "5678", ""};
ChunkSource chunk_source(chunks);
std::unique_ptr<i::Utf16CharacterStream> utf8_streaming_stream(
i::ScannerStream::For(&chunk_source,
v8::ScriptCompiler::StreamedSource::UTF8));
CHECK(!utf8_streaming_stream->can_be_cloned());
CHECK(utf8_streaming_stream->can_be_cloned());
TestCloneCharacterStream("12345678", utf8_streaming_stream.get(), 8);
}
{
const char16_t* chunks[] = {u"1234", u"5678", u""};
ChunkSource chunk_source(chunks);
std::unique_ptr<i::Utf16CharacterStream> two_byte_streaming_stream(
i::ScannerStream::For(&chunk_source,
v8::ScriptCompiler::StreamedSource::TWO_BYTE));
CHECK(!two_byte_streaming_stream->can_be_cloned());
CHECK(two_byte_streaming_stream->can_be_cloned());
TestCloneCharacterStream("12345678", two_byte_streaming_stream.get(), 8);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment