streaming-decoder.cc 17.9 KB
Newer Older
1 2 3 4 5 6
// Copyright 2017 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/wasm/streaming-decoder.h"

7
#include "src/base/template-utils.h"
8
#include "src/handles.h"
9 10 11
#include "src/objects-inl.h"
#include "src/objects/descriptor-array.h"
#include "src/objects/dictionary.h"
12 13 14
#include "src/wasm/decoder.h"
#include "src/wasm/leb-helper.h"
#include "src/wasm/module-decoder.h"
15
#include "src/wasm/wasm-code-manager.h"
Marja Hölttä's avatar
Marja Hölttä committed
16
#include "src/wasm/wasm-limits.h"
17 18 19
#include "src/wasm/wasm-objects.h"
#include "src/wasm/wasm-result.h"

20 21 22 23 24
#define TRACE_STREAMING(...)                            \
  do {                                                  \
    if (FLAG_trace_wasm_streaming) PrintF(__VA_ARGS__); \
  } while (false)

25 26 27
namespace v8 {
namespace internal {
namespace wasm {
28

29
void StreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) {
30 31 32 33 34 35
  if (deserializing()) {
    wire_bytes_for_deserializing_.insert(wire_bytes_for_deserializing_.end(),
                                         bytes.begin(), bytes.end());
    return;
  }

36
  TRACE_STREAMING("OnBytesReceived(%zu bytes)\n", bytes.size());
37

38
  size_t current = 0;
39
  while (ok() && current < bytes.size()) {
40 41
    size_t num_bytes =
        state_->ReadBytes(this, bytes.SubVector(current, bytes.size()));
42
    current += num_bytes;
43
    module_offset_ += num_bytes;
44
    if (state_->offset() == state_->buffer().size()) {
45 46 47
      state_ = state_->Next(this);
    }
  }
48
  total_size_ += bytes.size();
49 50 51
  if (ok()) {
    processor_->OnFinishedChunk();
  }
52 53 54 55
}

size_t StreamingDecoder::DecodingState::ReadBytes(StreamingDecoder* streaming,
                                                  Vector<const uint8_t> bytes) {
56 57
  Vector<uint8_t> remaining_buf = buffer() + offset();
  size_t num_bytes = std::min(bytes.size(), remaining_buf.size());
58
  TRACE_STREAMING("ReadBytes(%zu bytes)\n", num_bytes);
59
  memcpy(remaining_buf.start(), &bytes.first(), num_bytes);
60 61 62 63
  set_offset(offset() + num_bytes);
  return num_bytes;
}

64
void StreamingDecoder::Finish() {
65 66 67
  TRACE_STREAMING("Finish\n");
  if (!ok()) return;

68
  if (deserializing()) {
69
    Vector<const uint8_t> wire_bytes = VectorOf(wire_bytes_for_deserializing_);
70 71 72 73 74 75 76 77 78 79
    // Try to deserialize the module from wire bytes and module bytes.
    if (processor_->Deserialize(compiled_module_bytes_, wire_bytes)) return;

    // Deserialization failed. Restart decoding using |wire_bytes|.
    compiled_module_bytes_ = {};
    DCHECK(!deserializing());
    OnBytesReceived(wire_bytes);
    // The decoder has received all wire bytes; fall through and finish.
  }

80 81 82 83 84 85
  if (!state_->is_finishing_allowed()) {
    // The byte stream ended too early, we report an error.
    Error("unexpected end of stream");
    return;
  }

86 87
  OwnedVector<uint8_t> bytes = OwnedVector<uint8_t>::New(total_size_);
  uint8_t* cursor = bytes.start();
88
  {
89
#define BYTES(x) (x & 0xFF), (x >> 8) & 0xFF, (x >> 16) & 0xFF, (x >> 24) & 0xFF
90 91 92 93 94
    uint8_t module_header[]{BYTES(kWasmMagic), BYTES(kWasmVersion)};
#undef BYTES
    memcpy(cursor, module_header, arraysize(module_header));
    cursor += arraysize(module_header);
  }
95
  for (const auto& buffer : section_buffers_) {
96
    DCHECK_LE(cursor - bytes.start() + buffer->length(), total_size_);
97
    memcpy(cursor, buffer->bytes().start(), buffer->length());
98 99
    cursor += buffer->length();
  }
100
  processor_->OnFinishedStream(std::move(bytes));
101 102
}

103
void StreamingDecoder::Abort() {
104
  TRACE_STREAMING("Abort\n");
105
  if (!ok()) return;  // Failed already.
106 107
  processor_->OnAbort();
  Fail();
108 109
}

110 111 112 113 114 115 116 117 118 119 120 121
void StreamingDecoder::SetModuleCompiledCallback(
    ModuleCompiledCallback callback) {
  DCHECK_NULL(module_compiled_callback_);
  module_compiled_callback_ = callback;
}

bool StreamingDecoder::SetCompiledModuleBytes(
    Vector<const uint8_t> compiled_module_bytes) {
  compiled_module_bytes_ = compiled_module_bytes;
  return true;
}

122 123 124 125
namespace {

class TopTierCompiledCallback {
 public:
126
  TopTierCompiledCallback(std::weak_ptr<NativeModule> native_module,
127 128 129 130
                          StreamingDecoder::ModuleCompiledCallback callback)
      : native_module_(std::move(native_module)),
        callback_(std::move(callback)) {}

131
  void operator()(CompilationEvent event) const {
132
    if (event != CompilationEvent::kFinishedTopTierCompilation) return;
133 134 135 136 137
    // If the native module is still alive, get back a shared ptr and call the
    // callback.
    if (std::shared_ptr<NativeModule> native_module = native_module_.lock()) {
      callback_(native_module);
    }
138 139 140 141 142 143 144
#ifdef DEBUG
    DCHECK(!called_);
    called_ = true;
#endif
  }

 private:
145
  const std::weak_ptr<NativeModule> native_module_;
146 147 148 149 150 151 152 153
  const StreamingDecoder::ModuleCompiledCallback callback_;
#ifdef DEBUG
  mutable bool called_ = false;
#endif
};

}  // namespace

154 155
void StreamingDecoder::NotifyNativeModuleCreated(
    const std::shared_ptr<NativeModule>& native_module) {
156
  if (!module_compiled_callback_) return;
157
  auto* comp_state = native_module->compilation_state();
158 159
  comp_state->AddCallback(TopTierCompiledCallback{
      std::move(native_module), std::move(module_compiled_callback_)});
160
  module_compiled_callback_ = {};
161 162
}

163 164 165 166 167
// An abstract class to share code among the states which decode VarInts. This
// class takes over the decoding of the VarInt and then calls the actual decode
// code with the decoded value.
class StreamingDecoder::DecodeVarInt32 : public DecodingState {
 public:
168 169
  explicit DecodeVarInt32(size_t max_value, const char* field_name)
      : max_value_(max_value), field_name_(field_name) {}
170

171
  Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
172 173 174 175 176 177 178 179 180

  size_t ReadBytes(StreamingDecoder* streaming,
                   Vector<const uint8_t> bytes) override;

  std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override;

  virtual std::unique_ptr<DecodingState> NextWithValue(
      StreamingDecoder* streaming) = 0;

181
 protected:
182 183 184
  uint8_t byte_buffer_[kMaxVarInt32Size];
  // The maximum valid value decoded in this state. {Next} returns an error if
  // this value is exceeded.
185 186
  const size_t max_value_;
  const char* const field_name_;
187
  size_t value_ = 0;
188
  size_t bytes_consumed_ = 0;
189 190 191 192
};

class StreamingDecoder::DecodeModuleHeader : public DecodingState {
 public:
193
  Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
194 195 196 197 198 199 200 201 202 203 204 205 206 207

  std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override;

 private:
  // Checks if the magic bytes of the module header are correct.
  void CheckHeader(Decoder* decoder);

  // The size of the module header.
  static constexpr size_t kModuleHeaderSize = 8;
  uint8_t byte_buffer_[kModuleHeaderSize];
};

class StreamingDecoder::DecodeSectionID : public DecodingState {
 public:
208 209 210
  explicit DecodeSectionID(uint32_t module_offset)
      : module_offset_(module_offset) {}

211
  Vector<uint8_t> buffer() override { return {&id_, 1}; }
212 213 214 215 216 217
  bool is_finishing_allowed() const override { return true; }

  std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override;

 private:
  uint8_t id_ = 0;
218
  // The start offset of this section in the module.
219
  const uint32_t module_offset_;
220 221 222 223
};

class StreamingDecoder::DecodeSectionLength : public DecodeVarInt32 {
 public:
224 225 226 227
  explicit DecodeSectionLength(uint8_t id, uint32_t module_offset)
      : DecodeVarInt32(kV8MaxWasmModuleSize, "section length"),
        section_id_(id),
        module_offset_(module_offset) {}
228 229 230 231 232

  std::unique_ptr<DecodingState> NextWithValue(
      StreamingDecoder* streaming) override;

 private:
233
  const uint8_t section_id_;
234
  // The start offset of this section in the module.
235
  const uint32_t module_offset_;
236 237 238 239 240 241 242
};

class StreamingDecoder::DecodeSectionPayload : public DecodingState {
 public:
  explicit DecodeSectionPayload(SectionBuffer* section_buffer)
      : section_buffer_(section_buffer) {}

243
  Vector<uint8_t> buffer() override { return section_buffer_->payload(); }
244 245 246 247

  std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override;

 private:
248
  SectionBuffer* const section_buffer_;
249 250 251 252 253
};

class StreamingDecoder::DecodeNumberOfFunctions : public DecodeVarInt32 {
 public:
  explicit DecodeNumberOfFunctions(SectionBuffer* section_buffer)
254 255
      : DecodeVarInt32(kV8MaxWasmFunctions, "functions count"),
        section_buffer_(section_buffer) {}
256 257 258 259 260

  std::unique_ptr<DecodingState> NextWithValue(
      StreamingDecoder* streaming) override;

 private:
261
  SectionBuffer* const section_buffer_;
262 263 264 265 266 267 268
};

class StreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 {
 public:
  explicit DecodeFunctionLength(SectionBuffer* section_buffer,
                                size_t buffer_offset,
                                size_t num_remaining_functions)
269
      : DecodeVarInt32(kV8MaxWasmFunctionSize, "body size"),
270 271 272 273 274 275 276 277 278 279 280
        section_buffer_(section_buffer),
        buffer_offset_(buffer_offset),
        // We are reading a new function, so one function less is remaining.
        num_remaining_functions_(num_remaining_functions - 1) {
    DCHECK_GT(num_remaining_functions, 0);
  }

  std::unique_ptr<DecodingState> NextWithValue(
      StreamingDecoder* streaming) override;

 private:
281 282 283
  SectionBuffer* const section_buffer_;
  const size_t buffer_offset_;
  const size_t num_remaining_functions_;
284 285 286 287 288
};

class StreamingDecoder::DecodeFunctionBody : public DecodingState {
 public:
  explicit DecodeFunctionBody(SectionBuffer* section_buffer,
289
                              size_t buffer_offset, size_t function_body_length,
290 291
                              size_t num_remaining_functions,
                              uint32_t module_offset)
292 293
      : section_buffer_(section_buffer),
        buffer_offset_(buffer_offset),
294
        function_body_length_(function_body_length),
295 296
        num_remaining_functions_(num_remaining_functions),
        module_offset_(module_offset) {}
297

298 299 300 301
  Vector<uint8_t> buffer() override {
    Vector<uint8_t> remaining_buffer =
        section_buffer_->bytes() + buffer_offset_;
    return remaining_buffer.SubVector(0, function_body_length_);
302 303 304 305 306
  }

  std::unique_ptr<DecodingState> Next(StreamingDecoder* streaming) override;

 private:
307 308 309 310 311
  SectionBuffer* const section_buffer_;
  const size_t buffer_offset_;
  const size_t function_body_length_;
  const size_t num_remaining_functions_;
  const uint32_t module_offset_;
312 313 314 315
};

size_t StreamingDecoder::DecodeVarInt32::ReadBytes(
    StreamingDecoder* streaming, Vector<const uint8_t> bytes) {
316 317 318
  Vector<uint8_t> buf = buffer();
  Vector<uint8_t> remaining_buf = buf + offset();
  size_t new_bytes = std::min(bytes.size(), remaining_buf.size());
319
  TRACE_STREAMING("ReadBytes of a VarInt\n");
320 321 322
  memcpy(remaining_buf.start(), &bytes.first(), new_bytes);
  buf.Truncate(offset() + new_bytes);
  Decoder decoder(buf, streaming->module_offset());
323
  value_ = decoder.consume_u32v(field_name_);
324
  // The number of bytes we actually needed to read.
325 326
  DCHECK_GT(decoder.pc(), buffer().start());
  bytes_consumed_ = static_cast<size_t>(decoder.pc() - buf.start());
327
  TRACE_STREAMING("  ==> %zu bytes consumed\n", bytes_consumed_);
328 329

  if (decoder.failed()) {
330
    if (new_bytes == remaining_buf.size()) {
331
      // We only report an error if we read all bytes.
332
      streaming->Error(decoder.error());
333
    }
334 335
    set_offset(offset() + new_bytes);
    return new_bytes;
336
  }
337 338

  // We read all the bytes we needed.
339 340 341 342 343 344
  DCHECK_GT(bytes_consumed_, offset());
  new_bytes = bytes_consumed_ - offset();
  // Set the offset to the buffer size to signal that we are at the end of this
  // section.
  set_offset(buffer().size());
  return new_bytes;
345 346 347 348
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeVarInt32::Next(StreamingDecoder* streaming) {
349 350
  if (!streaming->ok()) return nullptr;

351
  if (value_ > max_value_) {
352
    std::ostringstream oss;
353
    oss << "function size > maximum function size: " << value_ << " < "
354 355
        << max_value_;
    return streaming->Error(oss.str());
356 357 358 359 360 361 362
  }

  return NextWithValue(streaming);
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeModuleHeader::Next(StreamingDecoder* streaming) {
363
  TRACE_STREAMING("DecodeModuleHeader\n");
364
  streaming->ProcessModuleHeader();
365 366
  if (!streaming->ok()) return nullptr;
  return base::make_unique<DecodeSectionID>(streaming->module_offset());
367 368 369 370
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionID::Next(StreamingDecoder* streaming) {
371
  TRACE_STREAMING("DecodeSectionID: %s section\n",
372 373
                  SectionName(static_cast<SectionCode>(id_)));
  return base::make_unique<DecodeSectionLength>(id_, module_offset_);
374 375 376 377 378
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionLength::NextWithValue(
    StreamingDecoder* streaming) {
379
  TRACE_STREAMING("DecodeSectionLength(%zu)\n", value_);
380 381 382
  SectionBuffer* buf =
      streaming->CreateNewBuffer(module_offset_, section_id_, value_,
                                 buffer().SubVector(0, bytes_consumed_));
383
  DCHECK_NOT_NULL(buf);
384 385
  if (value_ == 0) {
    if (section_id_ == SectionCode::kCodeSectionCode) {
386
      return streaming->Error("code section cannot have size 0");
387
    }
388 389
    // Process section without payload as well, to enforce section order and
    // other feature checks specific to each individual section.
390 391 392 393
    streaming->ProcessSection(buf);
    if (!streaming->ok()) return nullptr;
    // There is no payload, we go to the next section immediately.
    return base::make_unique<DecodeSectionID>(streaming->module_offset_);
394
  } else {
395
    if (section_id_ == SectionCode::kCodeSectionCode) {
396 397 398 399 400 401 402 403
      // Explicitly check for multiple code sections as module decoder never
      // sees the code section and hence cannot track this section.
      if (streaming->code_section_processed_) {
        // TODO(mstarzinger): This error message (and other in this class) is
        // different for non-streaming decoding. Bring them in sync and test.
        return streaming->Error("code section can only appear once");
      }
      streaming->code_section_processed_ = true;
404 405 406 407
      // We reached the code section. All functions of the code section are put
      // into the same SectionBuffer.
      return base::make_unique<DecodeNumberOfFunctions>(buf);
    }
408
    return base::make_unique<DecodeSectionPayload>(buf);
409 410 411 412 413
  }
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeSectionPayload::Next(StreamingDecoder* streaming) {
414
  TRACE_STREAMING("DecodeSectionPayload\n");
415
  streaming->ProcessSection(section_buffer_);
416 417
  if (!streaming->ok()) return nullptr;
  return base::make_unique<DecodeSectionID>(streaming->module_offset());
418 419 420 421 422
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeNumberOfFunctions::NextWithValue(
    StreamingDecoder* streaming) {
423
  TRACE_STREAMING("DecodeNumberOfFunctions(%zu)\n", value_);
424
  // Copy the bytes we read into the section buffer.
425 426
  Vector<uint8_t> payload_buf = section_buffer_->payload();
  if (payload_buf.size() < bytes_consumed_) {
427
    return streaming->Error("invalid code section length");
428
  }
429
  memcpy(payload_buf.start(), buffer().start(), bytes_consumed_);
430 431

  // {value} is the number of functions.
432
  if (value_ == 0) {
433
    if (payload_buf.size() != bytes_consumed_) {
434
      return streaming->Error("not all code section bytes were used");
435
    }
436
    return base::make_unique<DecodeSectionID>(streaming->module_offset());
437
  }
438

439 440 441
  DCHECK_GE(kMaxInt, value_);
  streaming->StartCodeSection(static_cast<int>(value_),
                              streaming->section_buffers_.back());
442 443 444 445
  if (!streaming->ok()) return nullptr;
  return base::make_unique<DecodeFunctionLength>(
      section_buffer_, section_buffer_->payload_offset() + bytes_consumed_,
      value_);
446 447 448 449 450
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeFunctionLength::NextWithValue(
    StreamingDecoder* streaming) {
451
  TRACE_STREAMING("DecodeFunctionLength(%zu)\n", value_);
452
  // Copy the bytes we consumed into the section buffer.
453 454
  Vector<uint8_t> fun_length_buffer = section_buffer_->bytes() + buffer_offset_;
  if (fun_length_buffer.size() < bytes_consumed_) {
455
    return streaming->Error("read past code section end");
456
  }
457
  memcpy(fun_length_buffer.start(), buffer().start(), bytes_consumed_);
458 459

  // {value} is the length of the function.
460
  if (value_ == 0) return streaming->Error("invalid function length (0)");
461 462 463

  if (buffer_offset_ + bytes_consumed_ + value_ > section_buffer_->length()) {
    return streaming->Error("not enough code section bytes");
464 465
  }

466
  return base::make_unique<DecodeFunctionBody>(
467 468
      section_buffer_, buffer_offset_ + bytes_consumed_, value_,
      num_remaining_functions_, streaming->module_offset());
469 470 471 472
}

std::unique_ptr<StreamingDecoder::DecodingState>
StreamingDecoder::DecodeFunctionBody::Next(StreamingDecoder* streaming) {
473
  TRACE_STREAMING("DecodeFunctionBody\n");
474
  streaming->ProcessFunctionBody(buffer(), module_offset_);
475 476
  if (!streaming->ok()) return nullptr;

477
  size_t end_offset = buffer_offset_ + function_body_length_;
478
  if (num_remaining_functions_ > 0) {
479 480
    return base::make_unique<DecodeFunctionLength>(section_buffer_, end_offset,
                                                   num_remaining_functions_);
481
  }
482
  // We just read the last function body. Continue with the next section.
483
  if (end_offset != section_buffer_->length()) {
484 485 486
    return streaming->Error("not all code section bytes were used");
  }
  return base::make_unique<DecodeSectionID>(streaming->module_offset());
487 488
}

489 490 491
StreamingDecoder::StreamingDecoder(
    std::unique_ptr<StreamingProcessor> processor)
    : processor_(std::move(processor)),
492
      // A module always starts with a module header.
493
      state_(new DecodeModuleHeader()) {}
494 495 496 497

StreamingDecoder::SectionBuffer* StreamingDecoder::CreateNewBuffer(
    uint32_t module_offset, uint8_t section_id, size_t length,
    Vector<const uint8_t> length_bytes) {
498 499
  // Section buffers are allocated in the same order they appear in the module,
  // they will be processed and later on concatenated in that same order.
500 501 502 503 504
  section_buffers_.emplace_back(std::make_shared<SectionBuffer>(
      module_offset, section_id, length, length_bytes));
  return section_buffers_.back().get();
}

505 506 507
}  // namespace wasm
}  // namespace internal
}  // namespace v8
508 509

#undef TRACE_STREAMING