Commit 969aba8f authored by yurys@chromium.org's avatar yurys@chromium.org

Rewrite SamplingCircularQueue

The new implementation:
* uses MemoryBarriers to make sure up-to-date data is accessed on both producer and consumer threads
* will not allow to overwrite records
* doesn't have notion of chunks, instead each entry is aligned on the cache line boundaries

BUG=v8:2814
R=bmeurer@chromium.org

Review URL: https://codereview.chromium.org/22849002

git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@16284 ce2b1a6d-e550-0410-aec6-3dcde31c8c00
parent cab5e052
...@@ -33,30 +33,60 @@ ...@@ -33,30 +33,60 @@
namespace v8 { namespace v8 {
namespace internal { namespace internal {
template<typename T, unsigned L>
SamplingCircularQueue<T, L>::SamplingCircularQueue()
: enqueue_pos_(buffer_),
dequeue_pos_(buffer_) {
}
template<typename T, unsigned L>
SamplingCircularQueue<T, L>::~SamplingCircularQueue() {
}
template<typename T, unsigned L>
T* SamplingCircularQueue<T, L>::StartDequeue() {
MemoryBarrier();
if (Acquire_Load(&dequeue_pos_->marker) == kFull) {
return &dequeue_pos_->record;
}
return NULL;
}
template<typename T, unsigned L>
void SamplingCircularQueue<T, L>::FinishDequeue() {
Release_Store(&dequeue_pos_->marker, kEmpty);
dequeue_pos_ = Next(dequeue_pos_);
}
void* SamplingCircularQueue::Enqueue() {
if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) { template<typename T, unsigned L>
if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) { T* SamplingCircularQueue<T, L>::StartEnqueue() {
producer_pos_->next_chunk_pos = buffer_; MemoryBarrier();
producer_pos_->enqueue_pos = buffer_; if (Acquire_Load(&enqueue_pos_->marker) == kEmpty) {
} return &enqueue_pos_->record;
Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
// Skip marker.
producer_pos_->enqueue_pos += 1;
producer_pos_->next_chunk_pos += chunk_size_;
} }
void* result = producer_pos_->enqueue_pos; return NULL;
producer_pos_->enqueue_pos += record_size_;
return result;
} }
void SamplingCircularQueue::WrapPositionIfNeeded( template<typename T, unsigned L>
SamplingCircularQueue::Cell** pos) { void SamplingCircularQueue<T, L>::FinishEnqueue() {
if (*pos == buffer_ + buffer_size_) *pos = buffer_; Release_Store(&enqueue_pos_->marker, kFull);
enqueue_pos_ = Next(enqueue_pos_);
} }
template<typename T, unsigned L>
typename SamplingCircularQueue<T, L>::Entry* SamplingCircularQueue<T, L>::Next(
Entry* entry) {
Entry* next = entry + 1;
if (next == &buffer_[L]) return buffer_;
return next;
}
} } // namespace v8::internal } } // namespace v8::internal
#endif // V8_CIRCULAR_QUEUE_INL_H_ #endif // V8_CIRCULAR_QUEUE_INL_H_
// Copyright 2010 the V8 project authors. All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided
// with the distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "v8.h"
#include "circular-queue-inl.h"
namespace v8 {
namespace internal {
SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
size_t desired_chunk_size_in_bytes,
unsigned buffer_size_in_chunks)
: record_size_(record_size_in_bytes / sizeof(Cell)),
chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
record_size_in_bytes + sizeof(Cell)),
chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
buffer_size_(chunk_size_ * buffer_size_in_chunks),
buffer_(NewArray<Cell>(buffer_size_)) {
ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
ASSERT(buffer_size_in_chunks > 2);
// Mark all chunks as clear.
for (size_t i = 0; i < buffer_size_; i += chunk_size_) {
buffer_[i] = kClear;
}
// Layout producer and consumer position pointers each on their own
// cache lines to avoid cache lines thrashing due to simultaneous
// updates of positions by different processor cores.
const int positions_size =
RoundUp(1, kProcessorCacheLineSize) +
RoundUp(static_cast<int>(sizeof(ProducerPosition)),
kProcessorCacheLineSize) +
RoundUp(static_cast<int>(sizeof(ConsumerPosition)),
kProcessorCacheLineSize);
positions_ = NewArray<byte>(positions_size);
producer_pos_ = reinterpret_cast<ProducerPosition*>(
RoundUp(positions_, kProcessorCacheLineSize));
producer_pos_->next_chunk_pos = buffer_;
producer_pos_->enqueue_pos = buffer_;
consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
positions_ + positions_size);
consumer_pos_->dequeue_chunk_pos = buffer_;
// The distance ensures that producer and consumer never step on
// each other's chunks and helps eviction of produced data from
// the CPU cache (having that chunk size is bigger than the cache.)
const size_t producer_consumer_distance = (2 * chunk_size_);
consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
consumer_pos_->dequeue_pos = NULL;
}
SamplingCircularQueue::~SamplingCircularQueue() {
DeleteArray(positions_);
DeleteArray(buffer_);
}
void* SamplingCircularQueue::StartDequeue() {
if (consumer_pos_->dequeue_pos != NULL) {
return consumer_pos_->dequeue_pos;
} else {
if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
// Skip marker.
consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
consumer_pos_->dequeue_end_pos =
consumer_pos_->dequeue_chunk_pos + chunk_size_;
return consumer_pos_->dequeue_pos;
} else {
return NULL;
}
}
}
void SamplingCircularQueue::FinishDequeue() {
consumer_pos_->dequeue_pos += record_size_;
if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
// Move to next chunk.
consumer_pos_->dequeue_pos = NULL;
*consumer_pos_->dequeue_chunk_pos = kClear;
consumer_pos_->dequeue_chunk_pos += chunk_size_;
WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
}
void SamplingCircularQueue::FlushResidualRecords() {
// Eliminate producer / consumer distance.
consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
}
} } // namespace v8::internal
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#ifndef V8_CIRCULAR_QUEUE_H_ #ifndef V8_CIRCULAR_QUEUE_H_
#define V8_CIRCULAR_QUEUE_H_ #define V8_CIRCULAR_QUEUE_H_
#include "v8globals.h"
namespace v8 { namespace v8 {
namespace internal { namespace internal {
...@@ -35,67 +37,50 @@ namespace internal { ...@@ -35,67 +37,50 @@ namespace internal {
// Lock-free cache-friendly sampling circular queue for large // Lock-free cache-friendly sampling circular queue for large
// records. Intended for fast transfer of large records between a // records. Intended for fast transfer of large records between a
// single producer and a single consumer. If the queue is full, // single producer and a single consumer. If the queue is full,
// previous unread records are overwritten. The queue is designed with // StartEnqueue will return NULL. The queue is designed with
// a goal in mind to evade cache lines thrashing by preventing // a goal in mind to evade cache lines thrashing by preventing
// simultaneous reads and writes to adjanced memory locations. // simultaneous reads and writes to adjanced memory locations.
// template<typename T, unsigned Length>
// IMPORTANT: as a producer never checks for chunks cleanness, it is
// possible that it can catch up and overwrite a chunk that a consumer
// is currently reading, resulting in a corrupt record being read.
class SamplingCircularQueue { class SamplingCircularQueue {
public: public:
// Executed on the application thread. // Executed on the application thread.
SamplingCircularQueue(size_t record_size_in_bytes, SamplingCircularQueue();
size_t desired_chunk_size_in_bytes,
unsigned buffer_size_in_chunks);
~SamplingCircularQueue(); ~SamplingCircularQueue();
// Enqueue returns a pointer to a memory location for storing the next // StartEnqueue returns a pointer to a memory location for storing the next
// record. // record or NULL if all entries are full at the moment.
INLINE(void* Enqueue()); T* StartEnqueue();
// Notifies the queue that the producer has complete writing data into the
// memory returned by StartEnqueue and it can be passed to the consumer.
void FinishEnqueue();
// Executed on the consumer (analyzer) thread. // Executed on the consumer (analyzer) thread.
// StartDequeue returns a pointer to a memory location for retrieving // StartDequeue returns a pointer to a memory location for retrieving
// the next record. After the record had been read by a consumer, // the next record. After the record had been read by a consumer,
// FinishDequeue must be called. Until that moment, subsequent calls // FinishDequeue must be called. Until that moment, subsequent calls
// to StartDequeue will return the same pointer. // to StartDequeue will return the same pointer.
void* StartDequeue(); T* StartDequeue();
void FinishDequeue(); void FinishDequeue();
// Due to a presence of slipping between the producer and the consumer,
// the queue must be notified whether producing has been finished in order
// to process remaining records from the buffer.
void FlushResidualRecords();
typedef AtomicWord Cell;
private: private:
// Reserved values for the chunk marker (first Cell in each chunk). // Reserved values for the entry marker.
enum { enum {
kClear, // Marks clean (processed) chunks. kEmpty, // Marks clean (processed) entries.
kEnqueueStarted // Marks chunks where enqueue started. kFull // Marks entries already filled by the producer but not yet
// completely processed by the consumer.
}; };
struct ProducerPosition { struct V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE) Entry {
Cell* next_chunk_pos; Entry() : marker(kEmpty) {}
Cell* enqueue_pos; T record;
}; Atomic32 marker;
struct ConsumerPosition {
Cell* dequeue_chunk_pos;
Cell* dequeue_chunk_poll_pos;
Cell* dequeue_pos;
Cell* dequeue_end_pos;
}; };
INLINE(void WrapPositionIfNeeded(Cell** pos)); Entry* Next(Entry* entry);
const size_t record_size_; Entry buffer_[Length];
const size_t chunk_size_in_bytes_; Entry* enqueue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE);
const size_t chunk_size_; Entry* dequeue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE);
const size_t buffer_size_;
Cell* buffer_;
byte* positions_;
ProducerPosition* producer_pos_;
ConsumerPosition* consumer_pos_;
DISALLOW_COPY_AND_ASSIGN(SamplingCircularQueue); DISALLOW_COPY_AND_ASSIGN(SamplingCircularQueue);
}; };
......
...@@ -67,13 +67,30 @@ void ReportBuiltinEventRecord::UpdateCodeMap(CodeMap* code_map) { ...@@ -67,13 +67,30 @@ void ReportBuiltinEventRecord::UpdateCodeMap(CodeMap* code_map) {
} }
TickSample* ProfilerEventsProcessor::TickSampleEvent() { TickSample* CpuProfiler::StartTickSample() {
if (is_profiling_) return processor_->StartTickSample();
return NULL;
}
void CpuProfiler::FinishTickSample() {
processor_->FinishTickSample();
}
TickSample* ProfilerEventsProcessor::StartTickSample() {
void* address = ticks_buffer_.StartEnqueue();
if (address == NULL) return NULL;
TickSampleEventRecord* evt = TickSampleEventRecord* evt =
new(ticks_buffer_.Enqueue()) TickSampleEventRecord(last_code_event_id_); new(address) TickSampleEventRecord(last_code_event_id_);
return &evt->sample; return &evt->sample;
} }
void ProfilerEventsProcessor::FinishTickSample() {
ticks_buffer_.FinishEnqueue();
}
} } // namespace v8::internal } } // namespace v8::internal
#endif // V8_CPU_PROFILER_INL_H_ #endif // V8_CPU_PROFILER_INL_H_
...@@ -40,8 +40,6 @@ ...@@ -40,8 +40,6 @@
namespace v8 { namespace v8 {
namespace internal { namespace internal {
static const int kTickSamplesBufferChunkSize = 64 * KB;
static const int kTickSamplesBufferChunksCount = 16;
static const int kProfilerStackSize = 64 * KB; static const int kProfilerStackSize = 64 * KB;
...@@ -49,9 +47,6 @@ ProfilerEventsProcessor::ProfilerEventsProcessor(ProfileGenerator* generator) ...@@ -49,9 +47,6 @@ ProfilerEventsProcessor::ProfilerEventsProcessor(ProfileGenerator* generator)
: Thread(Thread::Options("v8:ProfEvntProc", kProfilerStackSize)), : Thread(Thread::Options("v8:ProfEvntProc", kProfilerStackSize)),
generator_(generator), generator_(generator),
running_(true), running_(true),
ticks_buffer_(sizeof(TickSampleEventRecord),
kTickSamplesBufferChunkSize,
kTickSamplesBufferChunksCount),
last_code_event_id_(0), last_processed_code_event_id_(0) { last_code_event_id_(0), last_processed_code_event_id_(0) {
} }
...@@ -114,23 +109,10 @@ bool ProfilerEventsProcessor::ProcessTicks() { ...@@ -114,23 +109,10 @@ bool ProfilerEventsProcessor::ProcessTicks() {
generator_->RecordTickSample(record.sample); generator_->RecordTickSample(record.sample);
} }
const TickSampleEventRecord* rec = const TickSampleEventRecord* record = ticks_buffer_.StartDequeue();
TickSampleEventRecord::cast(ticks_buffer_.StartDequeue()); if (record == NULL) return !ticks_from_vm_buffer_.IsEmpty();
if (rec == NULL) return !ticks_from_vm_buffer_.IsEmpty(); if (record->order != last_processed_code_event_id_) return true;
// Make a local copy of tick sample record to ensure that it won't generator_->RecordTickSample(record->sample);
// be modified as we are processing it. This is possible as the
// sampler writes w/o any sync to the queue, so if the processor
// will get far behind, a record may be modified right under its
// feet.
TickSampleEventRecord record = *rec;
if (record.order != last_processed_code_event_id_) return true;
// A paranoid check to make sure that we don't get a memory overrun
// in case of frames_count having a wild value.
if (record.sample.frames_count < 0
|| record.sample.frames_count > TickSample::kMaxFramesCount)
record.sample.frames_count = 0;
generator_->RecordTickSample(record.sample);
ticks_buffer_.FinishDequeue(); ticks_buffer_.FinishDequeue();
} }
} }
...@@ -148,7 +130,6 @@ void ProfilerEventsProcessor::Run() { ...@@ -148,7 +130,6 @@ void ProfilerEventsProcessor::Run() {
} }
// Process remaining tick events. // Process remaining tick events.
ticks_buffer_.FlushResidualRecords();
do { do {
ProcessTicks(); ProcessTicks();
} while (ProcessCodeEvent()); } while (ProcessCodeEvent());
...@@ -166,12 +147,6 @@ CpuProfile* CpuProfiler::GetProfile(int index) { ...@@ -166,12 +147,6 @@ CpuProfile* CpuProfiler::GetProfile(int index) {
} }
TickSample* CpuProfiler::TickSampleEvent() {
if (is_profiling_) return processor_->TickSampleEvent();
return NULL;
}
void CpuProfiler::DeleteAllProfiles() { void CpuProfiler::DeleteAllProfiles() {
if (is_profiling_) StopProcessor(); if (is_profiling_) StopProcessor();
ResetProfiles(); ResetProfiles();
......
...@@ -114,10 +114,6 @@ class TickSampleEventRecord { ...@@ -114,10 +114,6 @@ class TickSampleEventRecord {
unsigned order; unsigned order;
TickSample sample; TickSample sample;
static TickSampleEventRecord* cast(void* value) {
return reinterpret_cast<TickSampleEventRecord*>(value);
}
}; };
...@@ -156,7 +152,8 @@ class ProfilerEventsProcessor : public Thread { ...@@ -156,7 +152,8 @@ class ProfilerEventsProcessor : public Thread {
// queue (because the structure is of fixed width, but usually not all // queue (because the structure is of fixed width, but usually not all
// stack frame entries are filled.) This method returns a pointer to the // stack frame entries are filled.) This method returns a pointer to the
// next record of the buffer. // next record of the buffer.
INLINE(TickSample* TickSampleEvent()); inline TickSample* StartTickSample();
inline void FinishTickSample();
private: private:
// Called from events processing thread (Run() method.) // Called from events processing thread (Run() method.)
...@@ -166,7 +163,11 @@ class ProfilerEventsProcessor : public Thread { ...@@ -166,7 +163,11 @@ class ProfilerEventsProcessor : public Thread {
ProfileGenerator* generator_; ProfileGenerator* generator_;
bool running_; bool running_;
UnboundQueue<CodeEventsContainer> events_buffer_; UnboundQueue<CodeEventsContainer> events_buffer_;
SamplingCircularQueue ticks_buffer_; static const size_t kTickSampleBufferSize = 1 * MB;
static const size_t kTickSampleQueueLength =
kTickSampleBufferSize / sizeof(TickSampleEventRecord);
SamplingCircularQueue<TickSampleEventRecord,
kTickSampleQueueLength> ticks_buffer_;
UnboundQueue<TickSampleEventRecord> ticks_from_vm_buffer_; UnboundQueue<TickSampleEventRecord> ticks_from_vm_buffer_;
unsigned last_code_event_id_; unsigned last_code_event_id_;
unsigned last_processed_code_event_id_; unsigned last_processed_code_event_id_;
...@@ -205,7 +206,8 @@ class CpuProfiler : public CodeEventListener { ...@@ -205,7 +206,8 @@ class CpuProfiler : public CodeEventListener {
void DeleteProfile(CpuProfile* profile); void DeleteProfile(CpuProfile* profile);
// Invoked from stack sampler (thread or signal handler.) // Invoked from stack sampler (thread or signal handler.)
TickSample* TickSampleEvent(); inline TickSample* StartTickSample();
inline void FinishTickSample();
// Must be called via PROFILE macro, otherwise will crash when // Must be called via PROFILE macro, otherwise will crash when
// profiling is not enabled. // profiling is not enabled.
......
...@@ -282,6 +282,10 @@ const int kOneByteSize = kCharSize; ...@@ -282,6 +282,10 @@ const int kOneByteSize = kCharSize;
const int kUC16Size = sizeof(uc16); // NOLINT const int kUC16Size = sizeof(uc16); // NOLINT
// Round up n to be a multiple of sz, where sz is a power of 2.
#define ROUND_UP(n, sz) (((n) + ((sz) - 1)) & ~((sz) - 1))
// The expression OFFSET_OF(type, field) computes the byte-offset // The expression OFFSET_OF(type, field) computes the byte-offset
// of the specified field relative to the containing type. This // of the specified field relative to the containing type. This
// corresponds to 'offsetof' (in stddef.h), except that it doesn't // corresponds to 'offsetof' (in stddef.h), except that it doesn't
......
...@@ -65,7 +65,7 @@ ...@@ -65,7 +65,7 @@
#include "v8.h" #include "v8.h"
#include "cpu-profiler.h" #include "cpu-profiler-inl.h"
#include "flags.h" #include "flags.h"
#include "frames-inl.h" #include "frames-inl.h"
#include "log.h" #include "log.h"
...@@ -693,7 +693,7 @@ void Sampler::Stop() { ...@@ -693,7 +693,7 @@ void Sampler::Stop() {
void Sampler::SampleStack(const RegisterState& state) { void Sampler::SampleStack(const RegisterState& state) {
TickSample* sample = isolate_->cpu_profiler()->TickSampleEvent(); TickSample* sample = isolate_->cpu_profiler()->StartTickSample();
TickSample sample_obj; TickSample sample_obj;
if (sample == NULL) sample = &sample_obj; if (sample == NULL) sample = &sample_obj;
sample->Init(isolate_, state); sample->Init(isolate_, state);
...@@ -703,6 +703,9 @@ void Sampler::SampleStack(const RegisterState& state) { ...@@ -703,6 +703,9 @@ void Sampler::SampleStack(const RegisterState& state) {
} }
} }
Tick(sample); Tick(sample);
if (sample != &sample_obj) {
isolate_->cpu_profiler()->FinishTickSample();
}
} }
} } // namespace v8::internal } } // namespace v8::internal
...@@ -97,7 +97,7 @@ const int kPageSizeBits = 20; ...@@ -97,7 +97,7 @@ const int kPageSizeBits = 20;
// On Intel architecture, cache line size is 64 bytes. // On Intel architecture, cache line size is 64 bytes.
// On ARM it may be less (32 bytes), but as far this constant is // On ARM it may be less (32 bytes), but as far this constant is
// used for aligning data, it doesn't hurt to align on a greater value. // used for aligning data, it doesn't hurt to align on a greater value.
const int kProcessorCacheLineSize = 64; #define PROCESSOR_CACHE_LINE_SIZE 64
// Constants relevant to double precision floating point numbers. // Constants relevant to double precision floating point numbers.
// If looking only at the top 32 bits, the QNaN mask is bits 19 to 30. // If looking only at the top 32 bits, the QNaN mask is bits 19 to 30.
......
...@@ -35,40 +35,33 @@ using i::SamplingCircularQueue; ...@@ -35,40 +35,33 @@ using i::SamplingCircularQueue;
TEST(SamplingCircularQueue) { TEST(SamplingCircularQueue) {
typedef SamplingCircularQueue::Cell Record; typedef i::AtomicWord Record;
const int kRecordsPerChunk = 4; const int kMaxRecordsInQueue = 4;
SamplingCircularQueue scq(sizeof(Record), SamplingCircularQueue<Record, kMaxRecordsInQueue> scq;
kRecordsPerChunk * sizeof(Record),
3);
// Check that we are using non-reserved values. // Check that we are using non-reserved values.
// Fill up the first chunk. // Fill up the first chunk.
CHECK_EQ(NULL, scq.StartDequeue()); CHECK_EQ(NULL, scq.StartDequeue());
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) { for (Record i = 1; i < 1 + kMaxRecordsInQueue; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.Enqueue()); Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
CHECK_NE(NULL, rec); CHECK_NE(NULL, rec);
*rec = i; *rec = i;
CHECK_EQ(NULL, scq.StartDequeue()); scq.FinishEnqueue();
} }
// Fill up the second chunk. Consumption must still be unavailable. // The queue is full, enqueue is not allowed.
CHECK_EQ(NULL, scq.StartDequeue()); CHECK_EQ(NULL, scq.StartEnqueue());
for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.Enqueue());
CHECK_NE(NULL, rec);
*rec = i;
CHECK_EQ(NULL, scq.StartDequeue());
}
Record* rec = reinterpret_cast<Record*>(scq.Enqueue()); // Try to enqueue when the the queue is full. Consumption must be available.
CHECK_NE(NULL, rec);
*rec = 20;
// Now as we started filling up the third chunk, consumption
// must become possible.
CHECK_NE(NULL, scq.StartDequeue()); CHECK_NE(NULL, scq.StartDequeue());
for (int i = 0; i < 10; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
CHECK_EQ(NULL, rec);
CHECK_NE(NULL, scq.StartDequeue());
}
// Consume the first chunk. // Consume all records.
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) { for (Record i = 1; i < 1 + kMaxRecordsInQueue; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartDequeue()); Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
CHECK_NE(NULL, rec); CHECK_NE(NULL, rec);
CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec)); CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
...@@ -76,16 +69,21 @@ TEST(SamplingCircularQueue) { ...@@ -76,16 +69,21 @@ TEST(SamplingCircularQueue) {
scq.FinishDequeue(); scq.FinishDequeue();
CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue())); CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
} }
// Now consumption must not be possible, as consumer now polls // The queue is empty.
// the first chunk for emptinness. CHECK_EQ(NULL, scq.StartDequeue());
CHECK_EQ(NULL, scq.StartDequeue()); CHECK_EQ(NULL, scq.StartDequeue());
for (Record i = 0; i < kMaxRecordsInQueue / 2; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
CHECK_NE(NULL, rec);
*rec = i;
scq.FinishEnqueue();
}
scq.FlushResidualRecords(); // Consume all available kMaxRecordsInQueue / 2 records.
// From now, consumer no more polls ahead of the current chunk,
// so it's possible to consume the second chunk.
CHECK_NE(NULL, scq.StartDequeue()); CHECK_NE(NULL, scq.StartDequeue());
// Consume the second chunk for (Record i = 0; i < kMaxRecordsInQueue / 2; ++i) {
for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartDequeue()); Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
CHECK_NE(NULL, rec); CHECK_NE(NULL, rec);
CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec)); CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
...@@ -93,19 +91,20 @@ TEST(SamplingCircularQueue) { ...@@ -93,19 +91,20 @@ TEST(SamplingCircularQueue) {
scq.FinishDequeue(); scq.FinishDequeue();
CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue())); CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
} }
// Consumption must still be possible as the first cell of the
// last chunk is not clean. // The queue is empty.
CHECK_NE(NULL, scq.StartDequeue()); CHECK_EQ(NULL, scq.StartDequeue());
} }
namespace { namespace {
typedef i::AtomicWord Record;
typedef SamplingCircularQueue<Record, 12> TestSampleQueue;
class ProducerThread: public i::Thread { class ProducerThread: public i::Thread {
public: public:
typedef SamplingCircularQueue::Cell Record; ProducerThread(TestSampleQueue* scq,
ProducerThread(SamplingCircularQueue* scq,
int records_per_chunk, int records_per_chunk,
Record value, Record value,
i::Semaphore* finished) i::Semaphore* finished)
...@@ -117,16 +116,17 @@ class ProducerThread: public i::Thread { ...@@ -117,16 +116,17 @@ class ProducerThread: public i::Thread {
virtual void Run() { virtual void Run() {
for (Record i = value_; i < value_ + records_per_chunk_; ++i) { for (Record i = value_; i < value_ + records_per_chunk_; ++i) {
Record* rec = reinterpret_cast<Record*>(scq_->Enqueue()); Record* rec = reinterpret_cast<Record*>(scq_->StartEnqueue());
CHECK_NE(NULL, rec); CHECK_NE(NULL, rec);
*rec = i; *rec = i;
scq_->FinishEnqueue();
} }
finished_->Signal(); finished_->Signal();
} }
private: private:
SamplingCircularQueue* scq_; TestSampleQueue* scq_;
const int records_per_chunk_; const int records_per_chunk_;
Record value_; Record value_;
i::Semaphore* finished_; i::Semaphore* finished_;
...@@ -140,17 +140,10 @@ TEST(SamplingCircularQueueMultithreading) { ...@@ -140,17 +140,10 @@ TEST(SamplingCircularQueueMultithreading) {
// to the case of profiling under Linux, where signal handler that // to the case of profiling under Linux, where signal handler that
// does sampling is called in the context of different VM threads. // does sampling is called in the context of different VM threads.
typedef ProducerThread::Record Record;
const int kRecordsPerChunk = 4; const int kRecordsPerChunk = 4;
SamplingCircularQueue scq(sizeof(Record), TestSampleQueue scq;
kRecordsPerChunk * sizeof(Record),
3);
i::Semaphore* semaphore = i::OS::CreateSemaphore(0); i::Semaphore* semaphore = i::OS::CreateSemaphore(0);
// Don't poll ahead, making possible to check data in the buffer
// immediately after enqueuing.
scq.FlushResidualRecords();
// Check that we are using non-reserved values.
ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore); ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore); ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore); ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);
......
...@@ -63,7 +63,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc, ...@@ -63,7 +63,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc,
i::Address frame1, i::Address frame1,
i::Address frame2 = NULL, i::Address frame2 = NULL,
i::Address frame3 = NULL) { i::Address frame3 = NULL) {
i::TickSample* sample = proc->TickSampleEvent(); i::TickSample* sample = proc->StartTickSample();
sample->pc = frame1; sample->pc = frame1;
sample->tos = frame1; sample->tos = frame1;
sample->frames_count = 0; sample->frames_count = 0;
...@@ -75,6 +75,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc, ...@@ -75,6 +75,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc,
sample->stack[1] = frame3; sample->stack[1] = frame3;
sample->frames_count = 2; sample->frames_count = 2;
} }
proc->FinishTickSample();
} }
namespace { namespace {
...@@ -276,13 +277,14 @@ TEST(Issue1398) { ...@@ -276,13 +277,14 @@ TEST(Issue1398) {
profiler.CodeCreateEvent(i::Logger::BUILTIN_TAG, code, "bbb"); profiler.CodeCreateEvent(i::Logger::BUILTIN_TAG, code, "bbb");
i::TickSample* sample = processor.TickSampleEvent(); i::TickSample* sample = processor.StartTickSample();
sample->pc = code->address(); sample->pc = code->address();
sample->tos = 0; sample->tos = 0;
sample->frames_count = i::TickSample::kMaxFramesCount; sample->frames_count = i::TickSample::kMaxFramesCount;
for (int i = 0; i < sample->frames_count; ++i) { for (int i = 0; i < sample->frames_count; ++i) {
sample->stack[i] = code->address(); sample->stack[i] = code->address();
} }
processor.FinishTickSample();
processor.StopSynchronously(); processor.StopSynchronously();
processor.Join(); processor.Join();
......
...@@ -246,7 +246,6 @@ ...@@ -246,7 +246,6 @@
'../../src/checks.cc', '../../src/checks.cc',
'../../src/checks.h', '../../src/checks.h',
'../../src/circular-queue-inl.h', '../../src/circular-queue-inl.h',
'../../src/circular-queue.cc',
'../../src/circular-queue.h', '../../src/circular-queue.h',
'../../src/code-stubs.cc', '../../src/code-stubs.cc',
'../../src/code-stubs.h', '../../src/code-stubs.h',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment