Add multithreading test for SamplingCircularQueue, fix implementation.

This is for the case of Linux, where sampling is done using SIGPROF
signal handler which is executed in the context of an interrupted
thread. In this case, my previous implementation with TLS doesn't
work.

Review URL: http://codereview.chromium.org/1138004

git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@4207 ce2b1a6d-e550-0410-aec6-3dcde31c8c00
parent 82a673b8
......@@ -82,11 +82,10 @@ Record* CircularQueue<Record>::Next(Record* curr) {
void* SamplingCircularQueue::Enqueue() {
Cell* enqueue_pos = reinterpret_cast<Cell*>(
Thread::GetThreadLocal(producer_key_));
WrapPositionIfNeeded(&enqueue_pos);
Thread::SetThreadLocal(producer_key_, enqueue_pos + record_size_);
return enqueue_pos;
WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
void* result = producer_pos_->enqueue_pos;
producer_pos_->enqueue_pos += record_size_;
return result;
}
......
......@@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
buffer_[i] = kClear;
}
buffer_[buffer_size_] = kEnd;
// Layout producer and consumer position pointers each on their own
// cache lines to avoid cache lines thrashing due to simultaneous
// updates of positions by different processor cores.
const int positions_size =
RoundUp(1, kProcessorCacheLineSize) +
RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) +
RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize);
positions_ = NewArray<byte>(positions_size);
producer_pos_ = reinterpret_cast<ProducerPosition*>(
RoundUp(positions_, kProcessorCacheLineSize));
producer_pos_->enqueue_pos = buffer_;
consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
positions_ + positions_size);
consumer_pos_->dequeue_chunk_pos = buffer_;
consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
consumer_pos_->dequeue_pos = NULL;
}
SamplingCircularQueue::~SamplingCircularQueue() {
DeleteArray(positions_);
DeleteArray(buffer_);
}
void SamplingCircularQueue::SetUpProducer() {
producer_key_ = Thread::CreateThreadLocalKey();
Thread::SetThreadLocal(producer_key_, buffer_);
}
void SamplingCircularQueue::TearDownProducer() {
Thread::DeleteThreadLocalKey(producer_key_);
}
void SamplingCircularQueue::SetUpConsumer() {
consumer_key_ = Thread::CreateThreadLocalKey();
ConsumerPosition* cp = new ConsumerPosition;
cp->dequeue_chunk_pos = buffer_;
cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
cp->dequeue_pos = NULL;
Thread::SetThreadLocal(consumer_key_, cp);
}
void SamplingCircularQueue::TearDownConsumer() {
delete reinterpret_cast<ConsumerPosition*>(
Thread::GetThreadLocal(consumer_key_));
Thread::DeleteThreadLocalKey(consumer_key_);
}
void* SamplingCircularQueue::StartDequeue() {
ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
Thread::GetThreadLocal(consumer_key_));
if (cp->dequeue_pos != NULL) {
return cp->dequeue_pos;
if (consumer_pos_->dequeue_pos != NULL) {
return consumer_pos_->dequeue_pos;
} else {
if (*cp->dequeue_chunk_poll_pos != kClear) {
cp->dequeue_pos = cp->dequeue_chunk_pos;
cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_;
return cp->dequeue_pos;
if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
return consumer_pos_->dequeue_pos;
} else {
return NULL;
}
......@@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() {
void SamplingCircularQueue::FinishDequeue() {
ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
Thread::GetThreadLocal(consumer_key_));
cp->dequeue_pos += record_size_;
if (cp->dequeue_pos < cp->dequeue_end_pos) return;
consumer_pos_->dequeue_pos += record_size_;
if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
// Move to next chunk.
cp->dequeue_pos = NULL;
*cp->dequeue_chunk_pos = kClear;
cp->dequeue_chunk_pos += chunk_size_;
WrapPositionIfNeeded(&cp->dequeue_chunk_pos);
cp->dequeue_chunk_poll_pos += chunk_size_;
WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos);
consumer_pos_->dequeue_pos = NULL;
*consumer_pos_->dequeue_chunk_pos = kClear;
consumer_pos_->dequeue_chunk_pos += chunk_size_;
WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
}
void SamplingCircularQueue::FlushResidualRecords() {
ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
Thread::GetThreadLocal(consumer_key_));
// Eliminate producer / consumer distance.
cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos;
consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
}
......
......@@ -76,15 +76,11 @@ class SamplingCircularQueue {
int buffer_size_in_chunks);
~SamplingCircularQueue();
// Executed on the producer (sampler) or application thread.
void SetUpProducer();
// Enqueue returns a pointer to a memory location for storing the next
// record.
INLINE(void* Enqueue());
void TearDownProducer();
// Executed on the consumer (analyzer) thread.
void SetUpConsumer();
// StartDequeue returns a pointer to a memory location for retrieving
// the next record. After the record had been read by a consumer,
// FinishDequeue must be called. Until that moment, subsequent calls
......@@ -95,7 +91,6 @@ class SamplingCircularQueue {
// the queue must be notified whether producing has been finished in order
// to process remaining records from the buffer.
void FlushResidualRecords();
void TearDownConsumer();
typedef AtomicWord Cell;
// Reserved values for the first cell of a record.
......@@ -103,6 +98,9 @@ class SamplingCircularQueue {
static const Cell kEnd = -1; // Marks the end of the buffer.
private:
struct ProducerPosition {
Cell* enqueue_pos;
};
struct ConsumerPosition {
Cell* dequeue_chunk_pos;
Cell* dequeue_chunk_poll_pos;
......@@ -118,10 +116,9 @@ class SamplingCircularQueue {
const int buffer_size_;
const int producer_consumer_distance_;
Cell* buffer_;
// Store producer and consumer data in TLS to avoid modifying the
// same CPU cache line from two threads simultaneously.
Thread::LocalStorageKey consumer_key_;
Thread::LocalStorageKey producer_key_;
byte* positions_;
ProducerPosition* producer_pos_;
ConsumerPosition* consumer_pos_;
};
......
......@@ -176,7 +176,6 @@ bool ProfilerEventsProcessor::ProcessTicks(unsigned dequeue_order) {
void ProfilerEventsProcessor::Run() {
ticks_buffer_.SetUpConsumer();
unsigned dequeue_order = 0;
running_ = true;
......@@ -194,7 +193,6 @@ void ProfilerEventsProcessor::Run() {
ticks_buffer_.FlushResidualRecords();
// Perform processing until we have tick events, skip remaining code events.
while (ProcessTicks(dequeue_order) && ProcessCodeEvent(&dequeue_order)) { }
ticks_buffer_.TearDownConsumer();
}
......
......@@ -154,14 +154,11 @@ class ProfilerEventsProcessor : public Thread {
void FunctionMoveEvent(Address from, Address to);
void FunctionDeleteEvent(Address from);
// Tick sampler registration. Called by sampler thread or signal handler.
inline void SetUpSamplesProducer() { ticks_buffer_.SetUpProducer(); }
// Tick sample events are filled directly in the buffer of the circular
// queue (because the structure is of fixed width, but usually not all
// stack frame entries are filled.) This method returns a pointer to the
// next record of the buffer.
INLINE(TickSample* TickSampleEvent());
inline void TearDownSamplesProducer() { ticks_buffer_.TearDownProducer(); }
private:
union CodeEventsContainer {
......
......@@ -195,6 +195,10 @@ const Address kFromSpaceZapValue = reinterpret_cast<Address>(0xbeefdad);
// gives 8K bytes per page.
const int kPageSizeBits = 13;
// On Intel architecture, cache line size is 64 bytes.
// On ARM it may be less (32 bytes), but as far this constant is
// used for aligning data, it doesn't hurt to align on a greater value.
const int kProcessorCacheLineSize = 64;
// Constants relevant to double precision floating point numbers.
......
......@@ -61,8 +61,6 @@ TEST(SamplingCircularQueue) {
SamplingCircularQueue scq(sizeof(Record),
kRecordsPerChunk * sizeof(Record),
3);
scq.SetUpProducer();
scq.SetUpConsumer();
// Check that we are using non-reserved values.
CHECK_NE(SamplingCircularQueue::kClear, 1);
......@@ -121,7 +119,103 @@ TEST(SamplingCircularQueue) {
// Consumption must still be possible as the first cell of the
// last chunk is not clean.
CHECK_NE(NULL, scq.StartDequeue());
}
namespace {
class ProducerThread: public i::Thread {
public:
typedef SamplingCircularQueue::Cell Record;
ProducerThread(SamplingCircularQueue* scq,
int records_per_chunk,
Record value,
i::Semaphore* finished)
: scq_(scq),
records_per_chunk_(records_per_chunk),
value_(value),
finished_(finished) { }
virtual void Run() {
for (Record i = value_; i < value_ + records_per_chunk_; ++i) {
Record* rec = reinterpret_cast<Record*>(scq_->Enqueue());
CHECK_NE(NULL, rec);
*rec = i;
}
finished_->Signal();
}
private:
SamplingCircularQueue* scq_;
const int records_per_chunk_;
Record value_;
i::Semaphore* finished_;
};
} // namespace
TEST(SamplingCircularQueueMultithreading) {
// Emulate multiple VM threads working 'one thread at a time.'
// This test enqueues data from different threads. This corresponds
// to the case of profiling under Linux, where signal handler that
// does sampling is called in the context of different VM threads.
typedef ProducerThread::Record Record;
const int kRecordsPerChunk = 4;
SamplingCircularQueue scq(sizeof(Record),
kRecordsPerChunk * sizeof(Record),
3);
i::Semaphore* semaphore = i::OS::CreateSemaphore(0);
// Don't poll ahead, making possible to check data in the buffer
// immediately after enqueuing.
scq.FlushResidualRecords();
// Check that we are using non-reserved values.
CHECK_NE(SamplingCircularQueue::kClear, 1);
CHECK_NE(SamplingCircularQueue::kEnd, 1);
ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);
CHECK_EQ(NULL, scq.StartDequeue());
producer1.Start();
semaphore->Wait();
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
CHECK_NE(NULL, rec);
CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
scq.FinishDequeue();
CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
}
CHECK_EQ(NULL, scq.StartDequeue());
producer2.Start();
semaphore->Wait();
for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
CHECK_NE(NULL, rec);
CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
scq.FinishDequeue();
CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
}
CHECK_EQ(NULL, scq.StartDequeue());
producer3.Start();
semaphore->Wait();
for (Record i = 20; i < 20 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
CHECK_NE(NULL, rec);
CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
scq.FinishDequeue();
CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
}
CHECK_EQ(NULL, scq.StartDequeue());
scq.TearDownConsumer();
scq.TearDownProducer();
delete semaphore;
}
......@@ -64,7 +64,6 @@ TEST(CodeEvents) {
ProfileGenerator generator(&profiles);
ProfilerEventsProcessor processor(&generator);
processor.Start();
processor.SetUpSamplesProducer();
while (!processor.running()) {
i::Thread::YieldCPU();
}
......@@ -117,8 +116,6 @@ TEST(CodeEvents) {
CodeEntry* entry5 = generator.code_map()->FindEntry(ToAddress(0x1700));
CHECK_NE(NULL, entry5);
CHECK_EQ(aaa_str, entry5->name());
processor.TearDownSamplesProducer();
}
......@@ -133,7 +130,6 @@ TEST(TickEvents) {
ProfileGenerator generator(&profiles);
ProfilerEventsProcessor processor(&generator);
processor.Start();
processor.SetUpSamplesProducer();
while (!processor.running()) {
i::Thread::YieldCPU();
}
......@@ -197,6 +193,4 @@ TEST(TickEvents) {
bottom_up_ddd_children.last()->GetChildren(&bottom_up_ddd_stub_children);
CHECK_EQ(1, bottom_up_ddd_stub_children.length());
CHECK_EQ("bbb", bottom_up_ddd_stub_children.last()->entry()->name());
processor.TearDownSamplesProducer();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment