Commit 6921a490 authored by Andreas Haas's avatar Andreas Haas Committed by Commit Bot

[wasm] Make StreamingDecoder abstract, introduce AsyncStreamingDecoder

The implementation of the StreamingDecoder depends on async compilation.
However, when the --single-threaded flag is set, async compilation is
not available. Therefore V8 does not support streaming compilation at
the moment if the --single-threaded flag is set.

This CL is the first step to support streaming compilation in
--single-threaded mode. This CL makes the StreamingDecoder an abstract
class, and the current implementation a sub-class called
AsyncStreamingDecoder. A follow-up CL will provided a second sub-class
implementation for streaming compilation in --single-threaded mode.

Bug: v8:10548
Change-Id: Ice5c01340d3df18f836a4a05d30571207ca8ccf6
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2208869
Commit-Queue: Andreas Haas <ahaas@chromium.org>
Reviewed-by: 's avatarThibaud Michaud <thibaudm@chromium.org>
Cr-Commit-Position: refs/heads/master@{#67931}
parent 6b228044
...@@ -1591,8 +1591,9 @@ class AsyncStreamingProcessor final : public StreamingProcessor { ...@@ -1591,8 +1591,9 @@ class AsyncStreamingProcessor final : public StreamingProcessor {
std::shared_ptr<StreamingDecoder> AsyncCompileJob::CreateStreamingDecoder() { std::shared_ptr<StreamingDecoder> AsyncCompileJob::CreateStreamingDecoder() {
DCHECK_NULL(stream_); DCHECK_NULL(stream_);
stream_.reset(new StreamingDecoder(std::make_unique<AsyncStreamingProcessor>( stream_ = StreamingDecoder::CreateAsyncStreamingDecoder(
this, isolate_->async_counters(), isolate_->allocator()))); std::make_unique<AsyncStreamingProcessor>(
this, isolate_->async_counters(), isolate_->allocator()));
return stream_; return stream_;
} }
......
...@@ -25,7 +25,223 @@ namespace v8 { ...@@ -25,7 +25,223 @@ namespace v8 {
namespace internal { namespace internal {
namespace wasm { namespace wasm {
void StreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) { class V8_EXPORT_PRIVATE AsyncStreamingDecoder : public StreamingDecoder {
public:
explicit AsyncStreamingDecoder(std::unique_ptr<StreamingProcessor> processor);
// The buffer passed into OnBytesReceived is owned by the caller.
void OnBytesReceived(Vector<const uint8_t> bytes) override;
void Finish() override;
void Abort() override;
// Notify the StreamingDecoder that compilation ended and the
// StreamingProcessor should not be called anymore.
void NotifyCompilationEnded() override { Fail(); }
// Caching support.
// Sets the callback that is called after the module is fully compiled.
using ModuleCompiledCallback =
std::function<void(const std::shared_ptr<NativeModule>&)>;
void SetModuleCompiledCallback(ModuleCompiledCallback callback) override;
// Passes previously compiled module bytes from the embedder's cache.
bool SetCompiledModuleBytes(
Vector<const uint8_t> compiled_module_bytes) override;
void NotifyNativeModuleCreated(
const std::shared_ptr<NativeModule>& native_module) override;
Vector<const char> url() override { return VectorOf(url_); }
void SetUrl(Vector<const char> url) override {
url_.assign(url.begin(), url.length());
}
private:
// The SectionBuffer is the data object for the content of a single section.
// It stores all bytes of the section (including section id and section
// length), and the offset where the actual payload starts.
class SectionBuffer : public WireBytesStorage {
public:
// id: The section id.
// payload_length: The length of the payload.
// length_bytes: The section length, as it is encoded in the module bytes.
SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length,
Vector<const uint8_t> length_bytes)
: // ID + length + payload
module_offset_(module_offset),
bytes_(OwnedVector<uint8_t>::New(1 + length_bytes.length() +
payload_length)),
payload_offset_(1 + length_bytes.length()) {
bytes_.start()[0] = id;
memcpy(bytes_.start() + 1, &length_bytes.first(), length_bytes.length());
}
SectionCode section_code() const {
return static_cast<SectionCode>(bytes_.start()[0]);
}
Vector<const uint8_t> GetCode(WireBytesRef ref) const final {
DCHECK_LE(module_offset_, ref.offset());
uint32_t offset_in_code_buffer = ref.offset() - module_offset_;
return bytes().SubVector(offset_in_code_buffer,
offset_in_code_buffer + ref.length());
}
uint32_t module_offset() const { return module_offset_; }
Vector<uint8_t> bytes() const { return bytes_.as_vector(); }
Vector<uint8_t> payload() const { return bytes() + payload_offset_; }
size_t length() const { return bytes_.size(); }
size_t payload_offset() const { return payload_offset_; }
private:
const uint32_t module_offset_;
const OwnedVector<uint8_t> bytes_;
const size_t payload_offset_;
};
// The decoding of a stream of wasm module bytes is organized in states. Each
// state provides a buffer to store the bytes required for the current state,
// information on how many bytes have already been received, how many bytes
// are needed, and a {Next} function which starts the next state once all
// bytes of the current state were received.
//
// The states change according to the following state diagram:
//
// Start
// |
// |
// v
// DecodeModuleHeader
// | _________________________________________
// | | |
// v v |
// DecodeSectionID --> DecodeSectionLength --> DecodeSectionPayload
// A |
// | | (if the section id == code)
// | v
// | DecodeNumberOfFunctions -- > DecodeFunctionLength
// | A |
// | | |
// | (after all functions were read) | v
// ------------------------------------- DecodeFunctionBody
//
class DecodingState {
public:
virtual ~DecodingState() = default;
// Reads the bytes for the current state and returns the number of read
// bytes.
virtual size_t ReadBytes(AsyncStreamingDecoder* streaming,
Vector<const uint8_t> bytes);
// Returns the next state of the streaming decoding.
virtual std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) = 0;
// The buffer to store the received bytes.
virtual Vector<uint8_t> buffer() = 0;
// The number of bytes which were already received.
size_t offset() const { return offset_; }
void set_offset(size_t value) { offset_ = value; }
// A flag to indicate if finishing the streaming decoder is allowed without
// error.
virtual bool is_finishing_allowed() const { return false; }
private:
size_t offset_ = 0;
};
// Forward declarations of the concrete states. This is needed so that they
// can access private members of the AsyncStreamingDecoder.
class DecodeVarInt32;
class DecodeModuleHeader;
class DecodeSectionID;
class DecodeSectionLength;
class DecodeSectionPayload;
class DecodeNumberOfFunctions;
class DecodeFunctionLength;
class DecodeFunctionBody;
// Creates a buffer for the next section of the module.
SectionBuffer* CreateNewBuffer(uint32_t module_offset, uint8_t section_id,
size_t length,
Vector<const uint8_t> length_bytes);
std::unique_ptr<DecodingState> Error(const WasmError& error) {
if (ok()) processor_->OnError(error);
Fail();
return std::unique_ptr<DecodingState>(nullptr);
}
std::unique_ptr<DecodingState> Error(std::string message) {
return Error(WasmError{module_offset_ - 1, std::move(message)});
}
void ProcessModuleHeader() {
if (!ok()) return;
if (!processor_->ProcessModuleHeader(state_->buffer(), 0)) Fail();
}
void ProcessSection(SectionBuffer* buffer) {
if (!ok()) return;
if (!processor_->ProcessSection(
buffer->section_code(), buffer->payload(),
buffer->module_offset() +
static_cast<uint32_t>(buffer->payload_offset()))) {
Fail();
}
}
void StartCodeSection(int num_functions,
std::shared_ptr<WireBytesStorage> wire_bytes_storage,
int code_section_length) {
if (!ok()) return;
// The offset passed to {ProcessCodeSectionHeader} is an error offset and
// not the start offset of a buffer. Therefore we need the -1 here.
if (!processor_->ProcessCodeSectionHeader(
num_functions, module_offset() - 1, std::move(wire_bytes_storage),
code_section_length)) {
Fail();
}
}
void ProcessFunctionBody(Vector<const uint8_t> bytes,
uint32_t module_offset) {
if (!ok()) return;
if (!processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
}
void Fail() {
// We reset the {processor_} field to represent failure. This also ensures
// that we do not accidentally call further methods on the processor after
// failure.
processor_.reset();
}
bool ok() const { return processor_ != nullptr; }
uint32_t module_offset() const { return module_offset_; }
bool deserializing() const { return !compiled_module_bytes_.empty(); }
std::unique_ptr<StreamingProcessor> processor_;
std::unique_ptr<DecodingState> state_;
std::vector<std::shared_ptr<SectionBuffer>> section_buffers_;
bool code_section_processed_ = false;
uint32_t module_offset_ = 0;
size_t total_size_ = 0;
std::string url_;
// Caching support.
ModuleCompiledCallback module_compiled_callback_ = nullptr;
// We need wire bytes in an array for deserializing cached modules.
std::vector<uint8_t> wire_bytes_for_deserializing_;
Vector<const uint8_t> compiled_module_bytes_;
DISALLOW_COPY_AND_ASSIGN(AsyncStreamingDecoder);
};
void AsyncStreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) {
if (deserializing()) { if (deserializing()) {
wire_bytes_for_deserializing_.insert(wire_bytes_for_deserializing_.end(), wire_bytes_for_deserializing_.insert(wire_bytes_for_deserializing_.end(),
bytes.begin(), bytes.end()); bytes.begin(), bytes.end());
...@@ -50,8 +266,8 @@ void StreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) { ...@@ -50,8 +266,8 @@ void StreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) {
} }
} }
size_t StreamingDecoder::DecodingState::ReadBytes(StreamingDecoder* streaming, size_t AsyncStreamingDecoder::DecodingState::ReadBytes(
Vector<const uint8_t> bytes) { AsyncStreamingDecoder* streaming, Vector<const uint8_t> bytes) {
Vector<uint8_t> remaining_buf = buffer() + offset(); Vector<uint8_t> remaining_buf = buffer() + offset();
size_t num_bytes = std::min(bytes.size(), remaining_buf.size()); size_t num_bytes = std::min(bytes.size(), remaining_buf.size());
TRACE_STREAMING("ReadBytes(%zu bytes)\n", num_bytes); TRACE_STREAMING("ReadBytes(%zu bytes)\n", num_bytes);
...@@ -60,7 +276,7 @@ size_t StreamingDecoder::DecodingState::ReadBytes(StreamingDecoder* streaming, ...@@ -60,7 +276,7 @@ size_t StreamingDecoder::DecodingState::ReadBytes(StreamingDecoder* streaming,
return num_bytes; return num_bytes;
} }
void StreamingDecoder::Finish() { void AsyncStreamingDecoder::Finish() {
TRACE_STREAMING("Finish\n"); TRACE_STREAMING("Finish\n");
if (!ok()) return; if (!ok()) return;
...@@ -99,20 +315,20 @@ void StreamingDecoder::Finish() { ...@@ -99,20 +315,20 @@ void StreamingDecoder::Finish() {
processor_->OnFinishedStream(std::move(bytes)); processor_->OnFinishedStream(std::move(bytes));
} }
void StreamingDecoder::Abort() { void AsyncStreamingDecoder::Abort() {
TRACE_STREAMING("Abort\n"); TRACE_STREAMING("Abort\n");
if (!ok()) return; // Failed already. if (!ok()) return; // Failed already.
processor_->OnAbort(); processor_->OnAbort();
Fail(); Fail();
} }
void StreamingDecoder::SetModuleCompiledCallback( void AsyncStreamingDecoder::SetModuleCompiledCallback(
ModuleCompiledCallback callback) { ModuleCompiledCallback callback) {
DCHECK_NULL(module_compiled_callback_); DCHECK_NULL(module_compiled_callback_);
module_compiled_callback_ = callback; module_compiled_callback_ = callback;
} }
bool StreamingDecoder::SetCompiledModuleBytes( bool AsyncStreamingDecoder::SetCompiledModuleBytes(
Vector<const uint8_t> compiled_module_bytes) { Vector<const uint8_t> compiled_module_bytes) {
compiled_module_bytes_ = compiled_module_bytes; compiled_module_bytes_ = compiled_module_bytes;
return true; return true;
...@@ -122,8 +338,9 @@ namespace { ...@@ -122,8 +338,9 @@ namespace {
class TopTierCompiledCallback { class TopTierCompiledCallback {
public: public:
TopTierCompiledCallback(std::weak_ptr<NativeModule> native_module, TopTierCompiledCallback(
StreamingDecoder::ModuleCompiledCallback callback) std::weak_ptr<NativeModule> native_module,
AsyncStreamingDecoder::ModuleCompiledCallback callback)
: native_module_(std::move(native_module)), : native_module_(std::move(native_module)),
callback_(std::move(callback)) {} callback_(std::move(callback)) {}
...@@ -142,7 +359,7 @@ class TopTierCompiledCallback { ...@@ -142,7 +359,7 @@ class TopTierCompiledCallback {
private: private:
const std::weak_ptr<NativeModule> native_module_; const std::weak_ptr<NativeModule> native_module_;
const StreamingDecoder::ModuleCompiledCallback callback_; const AsyncStreamingDecoder::ModuleCompiledCallback callback_;
#ifdef DEBUG #ifdef DEBUG
mutable bool called_ = false; mutable bool called_ = false;
#endif #endif
...@@ -150,7 +367,7 @@ class TopTierCompiledCallback { ...@@ -150,7 +367,7 @@ class TopTierCompiledCallback {
} // namespace } // namespace
void StreamingDecoder::NotifyNativeModuleCreated( void AsyncStreamingDecoder::NotifyNativeModuleCreated(
const std::shared_ptr<NativeModule>& native_module) { const std::shared_ptr<NativeModule>& native_module) {
if (!module_compiled_callback_) return; if (!module_compiled_callback_) return;
auto* comp_state = native_module->compilation_state(); auto* comp_state = native_module->compilation_state();
...@@ -162,20 +379,21 @@ void StreamingDecoder::NotifyNativeModuleCreated( ...@@ -162,20 +379,21 @@ void StreamingDecoder::NotifyNativeModuleCreated(
// An abstract class to share code among the states which decode VarInts. This // An abstract class to share code among the states which decode VarInts. This
// class takes over the decoding of the VarInt and then calls the actual decode // class takes over the decoding of the VarInt and then calls the actual decode
// code with the decoded value. // code with the decoded value.
class StreamingDecoder::DecodeVarInt32 : public DecodingState { class AsyncStreamingDecoder::DecodeVarInt32 : public DecodingState {
public: public:
explicit DecodeVarInt32(size_t max_value, const char* field_name) explicit DecodeVarInt32(size_t max_value, const char* field_name)
: max_value_(max_value), field_name_(field_name) {} : max_value_(max_value), field_name_(field_name) {}
Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); } Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
size_t ReadBytes(StreamingDecoder* streaming, size_t ReadBytes(AsyncStreamingDecoder* streaming,
Vector<const uint8_t> bytes) override; Vector<const uint8_t> bytes) override;
std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override; std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) override;
virtual std::unique_ptr<DecodingState> NextWithValue( virtual std::unique_ptr<DecodingState> NextWithValue(
StreamingDecoder* streaming) = 0; AsyncStreamingDecoder* streaming) = 0;
protected: protected:
uint8_t byte_buffer_[kMaxVarInt32Size]; uint8_t byte_buffer_[kMaxVarInt32Size];
...@@ -187,11 +405,12 @@ class StreamingDecoder::DecodeVarInt32 : public DecodingState { ...@@ -187,11 +405,12 @@ class StreamingDecoder::DecodeVarInt32 : public DecodingState {
size_t bytes_consumed_ = 0; size_t bytes_consumed_ = 0;
}; };
class StreamingDecoder::DecodeModuleHeader : public DecodingState { class AsyncStreamingDecoder::DecodeModuleHeader : public DecodingState {
public: public:
Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); } Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override; std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) override;
private: private:
// Checks if the magic bytes of the module header are correct. // Checks if the magic bytes of the module header are correct.
...@@ -202,7 +421,7 @@ class StreamingDecoder::DecodeModuleHeader : public DecodingState { ...@@ -202,7 +421,7 @@ class StreamingDecoder::DecodeModuleHeader : public DecodingState {
uint8_t byte_buffer_[kModuleHeaderSize]; uint8_t byte_buffer_[kModuleHeaderSize];
}; };
class StreamingDecoder::DecodeSectionID : public DecodingState { class AsyncStreamingDecoder::DecodeSectionID : public DecodingState {
public: public:
explicit DecodeSectionID(uint32_t module_offset) explicit DecodeSectionID(uint32_t module_offset)
: module_offset_(module_offset) {} : module_offset_(module_offset) {}
...@@ -210,7 +429,8 @@ class StreamingDecoder::DecodeSectionID : public DecodingState { ...@@ -210,7 +429,8 @@ class StreamingDecoder::DecodeSectionID : public DecodingState {
Vector<uint8_t> buffer() override { return {&id_, 1}; } Vector<uint8_t> buffer() override { return {&id_, 1}; }
bool is_finishing_allowed() const override { return true; } bool is_finishing_allowed() const override { return true; }
std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override; std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) override;
private: private:
uint8_t id_ = 0; uint8_t id_ = 0;
...@@ -218,7 +438,7 @@ class StreamingDecoder::DecodeSectionID : public DecodingState { ...@@ -218,7 +438,7 @@ class StreamingDecoder::DecodeSectionID : public DecodingState {
const uint32_t module_offset_; const uint32_t module_offset_;
}; };
class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 { class AsyncStreamingDecoder::DecodeSectionLength : public DecodeVarInt32 {
public: public:
explicit DecodeSectionLength(uint8_t id, uint32_t module_offset) explicit DecodeSectionLength(uint8_t id, uint32_t module_offset)
: DecodeVarInt32(kV8MaxWasmModuleSize, "section length"), : DecodeVarInt32(kV8MaxWasmModuleSize, "section length"),
...@@ -226,7 +446,7 @@ class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 { ...@@ -226,7 +446,7 @@ class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 {
module_offset_(module_offset) {} module_offset_(module_offset) {}
std::unique_ptr<DecodingState> NextWithValue( std::unique_ptr<DecodingState> NextWithValue(
StreamingDecoder* streaming) override; AsyncStreamingDecoder* streaming) override;
private: private:
const uint8_t section_id_; const uint8_t section_id_;
...@@ -234,33 +454,34 @@ class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 { ...@@ -234,33 +454,34 @@ class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 {
const uint32_t module_offset_; const uint32_t module_offset_;
}; };
class StreamingDecoder::DecodeSectionPayload : public DecodingState { class AsyncStreamingDecoder::DecodeSectionPayload : public DecodingState {
public: public:
explicit DecodeSectionPayload(SectionBuffer* section_buffer) explicit DecodeSectionPayload(SectionBuffer* section_buffer)
: section_buffer_(section_buffer) {} : section_buffer_(section_buffer) {}
Vector<uint8_t> buffer() override { return section_buffer_->payload(); } Vector<uint8_t> buffer() override { return section_buffer_->payload(); }
std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override; std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) override;
private: private:
SectionBuffer* const section_buffer_; SectionBuffer* const section_buffer_;
}; };
class StreamingDecoder::DecodeNumberOfFunctions : public DecodeVarInt32 { class AsyncStreamingDecoder::DecodeNumberOfFunctions : public DecodeVarInt32 {
public: public:
explicit DecodeNumberOfFunctions(SectionBuffer* section_buffer) explicit DecodeNumberOfFunctions(SectionBuffer* section_buffer)
: DecodeVarInt32(kV8MaxWasmFunctions, "functions count"), : DecodeVarInt32(kV8MaxWasmFunctions, "functions count"),
section_buffer_(section_buffer) {} section_buffer_(section_buffer) {}
std::unique_ptr<DecodingState> NextWithValue( std::unique_ptr<DecodingState> NextWithValue(
StreamingDecoder* streaming) override; AsyncStreamingDecoder* streaming) override;
private: private:
SectionBuffer* const section_buffer_; SectionBuffer* const section_buffer_;
}; };
class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 { class AsyncStreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 {
public: public:
explicit DecodeFunctionLength(SectionBuffer* section_buffer, explicit DecodeFunctionLength(SectionBuffer* section_buffer,
size_t buffer_offset, size_t buffer_offset,
...@@ -274,7 +495,7 @@ class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 { ...@@ -274,7 +495,7 @@ class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 {
} }
std::unique_ptr<DecodingState> NextWithValue( std::unique_ptr<DecodingState> NextWithValue(
StreamingDecoder* streaming) override; AsyncStreamingDecoder* streaming) override;
private: private:
SectionBuffer* const section_buffer_; SectionBuffer* const section_buffer_;
...@@ -282,7 +503,7 @@ class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 { ...@@ -282,7 +503,7 @@ class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 {
const size_t num_remaining_functions_; const size_t num_remaining_functions_;
}; };
class StreamingDecoder::DecodeFunctionBody : public DecodingState { class AsyncStreamingDecoder::DecodeFunctionBody : public DecodingState {
public: public:
explicit DecodeFunctionBody(SectionBuffer* section_buffer, explicit DecodeFunctionBody(SectionBuffer* section_buffer,
size_t buffer_offset, size_t function_body_length, size_t buffer_offset, size_t function_body_length,
...@@ -300,7 +521,8 @@ class StreamingDecoder::DecodeFunctionBody : public DecodingState { ...@@ -300,7 +521,8 @@ class StreamingDecoder::DecodeFunctionBody : public DecodingState {
return remaining_buffer.SubVector(0, function_body_length_); return remaining_buffer.SubVector(0, function_body_length_);
} }
std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override; std::unique_ptr<DecodingState> Next(
AsyncStreamingDecoder* streaming) override;
private: private:
SectionBuffer* const section_buffer_; SectionBuffer* const section_buffer_;
...@@ -310,8 +532,8 @@ class StreamingDecoder::DecodeFunctionBody : public DecodingState { ...@@ -310,8 +532,8 @@ class StreamingDecoder::DecodeFunctionBody : public DecodingState {
const uint32_t module_offset_; const uint32_t module_offset_;
}; };
size_t StreamingDecoder::DecodeVarInt32::ReadBytes( size_t AsyncStreamingDecoder::DecodeVarInt32::ReadBytes(
StreamingDecoder* streaming, Vector<const uint8_t> bytes) { AsyncStreamingDecoder* streaming, Vector<const uint8_t> bytes) {
Vector<uint8_t> buf = buffer(); Vector<uint8_t> buf = buffer();
Vector<uint8_t> remaining_buf = buf + offset(); Vector<uint8_t> remaining_buf = buf + offset();
size_t new_bytes = std::min(bytes.size(), remaining_buf.size()); size_t new_bytes = std::min(bytes.size(), remaining_buf.size());
...@@ -344,8 +566,8 @@ size_t StreamingDecoder::DecodeVarInt32::ReadBytes( ...@@ -344,8 +566,8 @@ size_t StreamingDecoder::DecodeVarInt32::ReadBytes(
return new_bytes; return new_bytes;
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeVarInt32::Next(StreamingDecoder* streaming) { AsyncStreamingDecoder::DecodeVarInt32::Next(AsyncStreamingDecoder* streaming) {
if (!streaming->ok()) return nullptr; if (!streaming->ok()) return nullptr;
if (value_ > max_value_) { if (value_ > max_value_) {
...@@ -358,16 +580,17 @@ StreamingDecoder::DecodeVarInt32::Next(StreamingDecoder* streaming) { ...@@ -358,16 +580,17 @@ StreamingDecoder::DecodeVarInt32::Next(StreamingDecoder* streaming) {
return NextWithValue(streaming); return NextWithValue(streaming);
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeModuleHeader::Next(StreamingDecoder* streaming) { AsyncStreamingDecoder::DecodeModuleHeader::Next(
AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeModuleHeader\n"); TRACE_STREAMING("DecodeModuleHeader\n");
streaming->ProcessModuleHeader(); streaming->ProcessModuleHeader();
if (!streaming->ok()) return nullptr; if (!streaming->ok()) return nullptr;
return std::make_unique<DecodeSectionID>(streaming->module_offset()); return std::make_unique<DecodeSectionID>(streaming->module_offset());
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionID::Next(StreamingDecoder* streaming) { AsyncStreamingDecoder::DecodeSectionID::Next(AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeSectionID: %s section\n", TRACE_STREAMING("DecodeSectionID: %s section\n",
SectionName(static_cast<SectionCode>(id_))); SectionName(static_cast<SectionCode>(id_)));
if (id_ == SectionCode::kCodeSectionCode) { if (id_ == SectionCode::kCodeSectionCode) {
...@@ -383,9 +606,9 @@ StreamingDecoder::DecodeSectionID::Next(StreamingDecoder* streaming) { ...@@ -383,9 +606,9 @@ StreamingDecoder::DecodeSectionID::Next(StreamingDecoder* streaming) {
return std::make_unique<DecodeSectionLength>(id_, module_offset_); return std::make_unique<DecodeSectionLength>(id_, module_offset_);
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionLength::NextWithValue( AsyncStreamingDecoder::DecodeSectionLength::NextWithValue(
StreamingDecoder* streaming) { AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeSectionLength(%zu)\n", value_); TRACE_STREAMING("DecodeSectionLength(%zu)\n", value_);
SectionBuffer* buf = SectionBuffer* buf =
streaming->CreateNewBuffer(module_offset_, section_id_, value_, streaming->CreateNewBuffer(module_offset_, section_id_, value_,
...@@ -410,17 +633,18 @@ StreamingDecoder::DecodeSectionLength::NextWithValue( ...@@ -410,17 +633,18 @@ StreamingDecoder::DecodeSectionLength::NextWithValue(
return std::make_unique<DecodeSectionPayload>(buf); return std::make_unique<DecodeSectionPayload>(buf);
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionPayload::Next(StreamingDecoder* streaming) { AsyncStreamingDecoder::DecodeSectionPayload::Next(
AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeSectionPayload\n"); TRACE_STREAMING("DecodeSectionPayload\n");
streaming->ProcessSection(section_buffer_); streaming->ProcessSection(section_buffer_);
if (!streaming->ok()) return nullptr; if (!streaming->ok()) return nullptr;
return std::make_unique<DecodeSectionID>(streaming->module_offset()); return std::make_unique<DecodeSectionID>(streaming->module_offset());
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeNumberOfFunctions::NextWithValue( AsyncStreamingDecoder::DecodeNumberOfFunctions::NextWithValue(
StreamingDecoder* streaming) { AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeNumberOfFunctions(%zu)\n", value_); TRACE_STREAMING("DecodeNumberOfFunctions(%zu)\n", value_);
// Copy the bytes we read into the section buffer. // Copy the bytes we read into the section buffer.
Vector<uint8_t> payload_buf = section_buffer_->payload(); Vector<uint8_t> payload_buf = section_buffer_->payload();
...@@ -449,9 +673,9 @@ StreamingDecoder::DecodeNumberOfFunctions::NextWithValue( ...@@ -449,9 +673,9 @@ StreamingDecoder::DecodeNumberOfFunctions::NextWithValue(
value_); value_);
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeFunctionLength::NextWithValue( AsyncStreamingDecoder::DecodeFunctionLength::NextWithValue(
StreamingDecoder* streaming) { AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeFunctionLength(%zu)\n", value_); TRACE_STREAMING("DecodeFunctionLength(%zu)\n", value_);
// Copy the bytes we consumed into the section buffer. // Copy the bytes we consumed into the section buffer.
Vector<uint8_t> fun_length_buffer = section_buffer_->bytes() + buffer_offset_; Vector<uint8_t> fun_length_buffer = section_buffer_->bytes() + buffer_offset_;
...@@ -472,8 +696,9 @@ StreamingDecoder::DecodeFunctionLength::NextWithValue( ...@@ -472,8 +696,9 @@ StreamingDecoder::DecodeFunctionLength::NextWithValue(
num_remaining_functions_, streaming->module_offset()); num_remaining_functions_, streaming->module_offset());
} }
std::unique_ptr<StreamingDecoder::DecodingState> std::unique_ptr<AsyncStreamingDecoder::DecodingState>
StreamingDecoder::DecodeFunctionBody::Next(StreamingDecoder* streaming) { AsyncStreamingDecoder::DecodeFunctionBody::Next(
AsyncStreamingDecoder* streaming) {
TRACE_STREAMING("DecodeFunctionBody\n"); TRACE_STREAMING("DecodeFunctionBody\n");
streaming->ProcessFunctionBody(buffer(), module_offset_); streaming->ProcessFunctionBody(buffer(), module_offset_);
if (!streaming->ok()) return nullptr; if (!streaming->ok()) return nullptr;
...@@ -490,13 +715,13 @@ StreamingDecoder::DecodeFunctionBody::Next(StreamingDecoder* streaming) { ...@@ -490,13 +715,13 @@ StreamingDecoder::DecodeFunctionBody::Next(StreamingDecoder* streaming) {
return std::make_unique<DecodeSectionID>(streaming->module_offset()); return std::make_unique<DecodeSectionID>(streaming->module_offset());
} }
StreamingDecoder::StreamingDecoder( AsyncStreamingDecoder::AsyncStreamingDecoder(
std::unique_ptr<StreamingProcessor> processor) std::unique_ptr<StreamingProcessor> processor)
: processor_(std::move(processor)), : processor_(std::move(processor)),
// A module always starts with a module header. // A module always starts with a module header.
state_(new DecodeModuleHeader()) {} state_(new DecodeModuleHeader()) {}
StreamingDecoder::SectionBuffer* StreamingDecoder::CreateNewBuffer( AsyncStreamingDecoder::SectionBuffer* AsyncStreamingDecoder::CreateNewBuffer(
uint32_t module_offset, uint8_t section_id, size_t length, uint32_t module_offset, uint8_t section_id, size_t length,
Vector<const uint8_t> length_bytes) { Vector<const uint8_t> length_bytes) {
// Section buffers are allocated in the same order they appear in the module, // Section buffers are allocated in the same order they appear in the module,
...@@ -506,6 +731,11 @@ StreamingDecoder::SectionBuffer* StreamingDecoder::CreateNewBuffer( ...@@ -506,6 +731,11 @@ StreamingDecoder::SectionBuffer* StreamingDecoder::CreateNewBuffer(
return section_buffers_.back().get(); return section_buffers_.back().get();
} }
std::unique_ptr<StreamingDecoder> StreamingDecoder::CreateAsyncStreamingDecoder(
std::unique_ptr<StreamingProcessor> processor) {
return std::make_unique<AsyncStreamingDecoder>(std::move(processor));
}
} // namespace wasm } // namespace wasm
} // namespace internal } // namespace internal
} // namespace v8 } // namespace v8
......
...@@ -66,220 +66,35 @@ class V8_EXPORT_PRIVATE StreamingProcessor { ...@@ -66,220 +66,35 @@ class V8_EXPORT_PRIVATE StreamingProcessor {
// and function bodies. // and function bodies.
class V8_EXPORT_PRIVATE StreamingDecoder { class V8_EXPORT_PRIVATE StreamingDecoder {
public: public:
explicit StreamingDecoder(std::unique_ptr<StreamingProcessor> processor); virtual ~StreamingDecoder() = default;
// The buffer passed into OnBytesReceived is owned by the caller. // The buffer passed into OnBytesReceived is owned by the caller.
void OnBytesReceived(Vector<const uint8_t> bytes); virtual void OnBytesReceived(Vector<const uint8_t> bytes) = 0;
void Finish(); virtual void Finish() = 0;
void Abort(); virtual void Abort() = 0;
// Notify the StreamingDecoder that compilation ended and the // Notify the StreamingDecoder that compilation ended and the
// StreamingProcessor should not be called anymore. // StreamingProcessor should not be called anymore.
void NotifyCompilationEnded() { Fail(); } virtual void NotifyCompilationEnded() = 0;
// Caching support. // Caching support.
// Sets the callback that is called after the module is fully compiled. // Sets the callback that is called after the module is fully compiled.
using ModuleCompiledCallback = using ModuleCompiledCallback =
std::function<void(const std::shared_ptr<NativeModule>&)>; std::function<void(const std::shared_ptr<NativeModule>&)>;
void SetModuleCompiledCallback(ModuleCompiledCallback callback); virtual void SetModuleCompiledCallback(ModuleCompiledCallback callback) = 0;
// Passes previously compiled module bytes from the embedder's cache. // Passes previously compiled module bytes from the embedder's cache.
bool SetCompiledModuleBytes(Vector<const uint8_t> compiled_module_bytes); virtual bool SetCompiledModuleBytes(
Vector<const uint8_t> compiled_module_bytes) = 0;
void NotifyNativeModuleCreated( virtual void NotifyNativeModuleCreated(
const std::shared_ptr<NativeModule>& native_module); const std::shared_ptr<NativeModule>& native_module) = 0;
Vector<const char> url() { return VectorOf(url_); } virtual Vector<const char> url() = 0;
void SetUrl(Vector<const char> url) { virtual void SetUrl(Vector<const char> url) = 0;
url_.assign(url.begin(), url.length()); static std::unique_ptr<StreamingDecoder> CreateAsyncStreamingDecoder(
} std::unique_ptr<StreamingProcessor> processor);
private:
// TODO(ahaas): Put the whole private state of the StreamingDecoder into the
// cc file (PIMPL design pattern).
// The SectionBuffer is the data object for the content of a single section.
// It stores all bytes of the section (including section id and section
// length), and the offset where the actual payload starts.
class SectionBuffer : public WireBytesStorage {
public:
// id: The section id.
// payload_length: The length of the payload.
// length_bytes: The section length, as it is encoded in the module bytes.
SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length,
Vector<const uint8_t> length_bytes)
: // ID + length + payload
module_offset_(module_offset),
bytes_(OwnedVector<uint8_t>::New(1 + length_bytes.length() +
payload_length)),
payload_offset_(1 + length_bytes.length()) {
bytes_.start()[0] = id;
memcpy(bytes_.start() + 1, &length_bytes.first(), length_bytes.length());
}
SectionCode section_code() const {
return static_cast<SectionCode>(bytes_.start()[0]);
}
Vector<const uint8_t> GetCode(WireBytesRef ref) const final {
DCHECK_LE(module_offset_, ref.offset());
uint32_t offset_in_code_buffer = ref.offset() - module_offset_;
return bytes().SubVector(offset_in_code_buffer,
offset_in_code_buffer + ref.length());
}
uint32_t module_offset() const { return module_offset_; }
Vector<uint8_t> bytes() const { return bytes_.as_vector(); }
Vector<uint8_t> payload() const { return bytes() + payload_offset_; }
size_t length() const { return bytes_.size(); }
size_t payload_offset() const { return payload_offset_; }
private:
const uint32_t module_offset_;
const OwnedVector<uint8_t> bytes_;
const size_t payload_offset_;
};
// The decoding of a stream of wasm module bytes is organized in states. Each
// state provides a buffer to store the bytes required for the current state,
// information on how many bytes have already been received, how many bytes
// are needed, and a {Next} function which starts the next state once all
// bytes of the current state were received.
//
// The states change according to the following state diagram:
//
// Start
// |
// |
// v
// DecodeModuleHeader
// | _________________________________________
// | | |
// v v |
// DecodeSectionID --> DecodeSectionLength --> DecodeSectionPayload
// A |
// | | (if the section id == code)
// | v
// | DecodeNumberOfFunctions -- > DecodeFunctionLength
// | A |
// | | |
// | (after all functions were read) | v
// ------------------------------------- DecodeFunctionBody
//
class DecodingState {
public:
virtual ~DecodingState() = default;
// Reads the bytes for the current state and returns the number of read
// bytes.
virtual size_t ReadBytes(StreamingDecoder* streaming,
Vector<const uint8_t> bytes);
// Returns the next state of the streaming decoding.
virtual std::unique_ptr<DecodingState> Next(
StreamingDecoder* streaming) = 0;
// The buffer to store the received bytes.
virtual Vector<uint8_t> buffer() = 0;
// The number of bytes which were already received.
size_t offset() const { return offset_; }
void set_offset(size_t value) { offset_ = value; }
// A flag to indicate if finishing the streaming decoder is allowed without
// error.
virtual bool is_finishing_allowed() const { return false; }
private:
size_t offset_ = 0;
};
// Forward declarations of the concrete states. This is needed so that they
// can access private members of the StreamingDecoder.
class DecodeVarInt32;
class DecodeModuleHeader;
class DecodeSectionID;
class DecodeSectionLength;
class DecodeSectionPayload;
class DecodeNumberOfFunctions;
class DecodeFunctionLength;
class DecodeFunctionBody;
// Creates a buffer for the next section of the module.
SectionBuffer* CreateNewBuffer(uint32_t module_offset, uint8_t section_id,
size_t length,
Vector<const uint8_t> length_bytes);
std::unique_ptr<DecodingState> Error(const WasmError& error) {
if (ok()) processor_->OnError(error);
Fail();
return std::unique_ptr<DecodingState>(nullptr);
}
std::unique_ptr<DecodingState> Error(std::string message) {
return Error(WasmError{module_offset_ - 1, std::move(message)});
}
void ProcessModuleHeader() {
if (!ok()) return;
if (!processor_->ProcessModuleHeader(state_->buffer(), 0)) Fail();
}
void ProcessSection(SectionBuffer* buffer) {
if (!ok()) return;
if (!processor_->ProcessSection(
buffer->section_code(), buffer->payload(),
buffer->module_offset() +
static_cast<uint32_t>(buffer->payload_offset()))) {
Fail();
}
}
void StartCodeSection(int num_functions,
std::shared_ptr<WireBytesStorage> wire_bytes_storage,
int code_section_length) {
if (!ok()) return;
// The offset passed to {ProcessCodeSectionHeader} is an error offset and
// not the start offset of a buffer. Therefore we need the -1 here.
if (!processor_->ProcessCodeSectionHeader(
num_functions, module_offset() - 1, std::move(wire_bytes_storage),
code_section_length)) {
Fail();
}
}
void ProcessFunctionBody(Vector<const uint8_t> bytes,
uint32_t module_offset) {
if (!ok()) return;
if (!processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
}
void Fail() {
// We reset the {processor_} field to represent failure. This also ensures
// that we do not accidentally call further methods on the processor after
// failure.
processor_.reset();
}
bool ok() const { return processor_ != nullptr; }
uint32_t module_offset() const { return module_offset_; }
bool deserializing() const { return !compiled_module_bytes_.empty(); }
std::unique_ptr<StreamingProcessor> processor_;
std::unique_ptr<DecodingState> state_;
std::vector<std::shared_ptr<SectionBuffer>> section_buffers_;
bool code_section_processed_ = false;
uint32_t module_offset_ = 0;
size_t total_size_ = 0;
std::string url_;
// Caching support.
ModuleCompiledCallback module_compiled_callback_ = nullptr;
// We need wire bytes in an array for deserializing cached modules.
std::vector<uint8_t> wire_bytes_for_deserializing_;
Vector<const uint8_t> compiled_module_bytes_;
DISALLOW_COPY_AND_ASSIGN(StreamingDecoder);
}; };
} // namespace wasm } // namespace wasm
......
...@@ -99,11 +99,11 @@ class WasmStreamingDecoderTest : public ::testing::Test { ...@@ -99,11 +99,11 @@ class WasmStreamingDecoderTest : public ::testing::Test {
size_t expected_functions) { size_t expected_functions) {
for (int split = 0; split <= data.length(); ++split) { for (int split = 0; split <= data.length(); ++split) {
MockStreamingResult result; MockStreamingResult result;
StreamingDecoder stream( auto stream = StreamingDecoder::CreateAsyncStreamingDecoder(
std::make_unique<MockStreamingProcessor>(&result)); std::make_unique<MockStreamingProcessor>(&result));
stream.OnBytesReceived(data.SubVector(0, split)); stream->OnBytesReceived(data.SubVector(0, split));
stream.OnBytesReceived(data.SubVector(split, data.length())); stream->OnBytesReceived(data.SubVector(split, data.length()));
stream.Finish(); stream->Finish();
EXPECT_TRUE(result.ok()); EXPECT_TRUE(result.ok());
EXPECT_EQ(expected_sections, result.num_sections); EXPECT_EQ(expected_sections, result.num_sections);
EXPECT_EQ(expected_functions, result.num_functions); EXPECT_EQ(expected_functions, result.num_functions);
...@@ -115,11 +115,11 @@ class WasmStreamingDecoderTest : public ::testing::Test { ...@@ -115,11 +115,11 @@ class WasmStreamingDecoderTest : public ::testing::Test {
const char* message) { const char* message) {
for (int split = 0; split <= data.length(); ++split) { for (int split = 0; split <= data.length(); ++split) {
MockStreamingResult result; MockStreamingResult result;
StreamingDecoder stream( auto stream = StreamingDecoder::CreateAsyncStreamingDecoder(
std::make_unique<MockStreamingProcessor>(&result)); std::make_unique<MockStreamingProcessor>(&result));
stream.OnBytesReceived(data.SubVector(0, split)); stream->OnBytesReceived(data.SubVector(0, split));
stream.OnBytesReceived(data.SubVector(split, data.length())); stream->OnBytesReceived(data.SubVector(split, data.length()));
stream.Finish(); stream->Finish();
EXPECT_FALSE(result.ok()); EXPECT_FALSE(result.ok());
EXPECT_EQ(error_offset, result.error.offset()); EXPECT_EQ(error_offset, result.error.offset());
EXPECT_EQ(message, result.error.message()); EXPECT_EQ(message, result.error.message());
...@@ -129,8 +129,9 @@ class WasmStreamingDecoderTest : public ::testing::Test { ...@@ -129,8 +129,9 @@ class WasmStreamingDecoderTest : public ::testing::Test {
TEST_F(WasmStreamingDecoderTest, EmptyStream) { TEST_F(WasmStreamingDecoderTest, EmptyStream) {
MockStreamingResult result; MockStreamingResult result;
StreamingDecoder stream(std::make_unique<MockStreamingProcessor>(&result)); auto stream = StreamingDecoder::CreateAsyncStreamingDecoder(
stream.Finish(); std::make_unique<MockStreamingProcessor>(&result));
stream->Finish();
EXPECT_FALSE(result.ok()); EXPECT_FALSE(result.ok());
} }
...@@ -138,9 +139,10 @@ TEST_F(WasmStreamingDecoderTest, IncompleteModuleHeader) { ...@@ -138,9 +139,10 @@ TEST_F(WasmStreamingDecoderTest, IncompleteModuleHeader) {
const uint8_t data[] = {U32_LE(kWasmMagic), U32_LE(kWasmVersion)}; const uint8_t data[] = {U32_LE(kWasmMagic), U32_LE(kWasmVersion)};
{ {
MockStreamingResult result; MockStreamingResult result;
StreamingDecoder stream(std::make_unique<MockStreamingProcessor>(&result)); auto stream = StreamingDecoder::CreateAsyncStreamingDecoder(
stream.OnBytesReceived(VectorOf(data, 1)); std::make_unique<MockStreamingProcessor>(&result));
stream.Finish(); stream->OnBytesReceived(VectorOf(data, 1));
stream->Finish();
EXPECT_FALSE(result.ok()); EXPECT_FALSE(result.ok());
} }
for (uint32_t length = 1; length < sizeof(data); ++length) { for (uint32_t length = 1; length < sizeof(data); ++length) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment