Commit 97681be4 authored by yurys@chromium.org's avatar yurys@chromium.org

Fix data race in SamplingCircularQueue

This change fixes data race described in the bug by adding Acquire_Load to SamplingCircularQueue::StartDequeue and Acquire_Store to SamplingCircularQueue::Enqueue.

Also the queue implementation imposed a constraint on the records it stored: the first AtomicWord in each record was a marker. For that purpose TickSampleEventRecord had filter field of type int. This approach is error prone, e.g. on x64 sizeof(AtomicWord) is 8 while sizeof(int) is 4. Moreover the queue needs such marker only at the beginning of chunk. I changed the queue so that it stores the marker explicitly as the first Cell in chunk and removed the filter field.

BUG=251218
R=loislo@chromium.org, yangguo@chromium.org

Review URL: https://codereview.chromium.org/19642002

git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@15750 ce2b1a6d-e550-0410-aec6-3dcde31c8c00
parent 693354a6
......@@ -35,7 +35,16 @@ namespace internal {
void* SamplingCircularQueue::Enqueue() {
WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) {
if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) {
producer_pos_->next_chunk_pos = buffer_;
producer_pos_->enqueue_pos = buffer_;
}
Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
// Skip marker.
producer_pos_->enqueue_pos += 1;
producer_pos_->next_chunk_pos += chunk_size_;
}
void* result = producer_pos_->enqueue_pos;
producer_pos_->enqueue_pos += record_size_;
return result;
......@@ -44,7 +53,7 @@ void* SamplingCircularQueue::Enqueue() {
void SamplingCircularQueue::WrapPositionIfNeeded(
SamplingCircularQueue::Cell** pos) {
if (**pos == kEnd) *pos = buffer_;
if (*pos == buffer_ + buffer_size_) *pos = buffer_;
}
......
......@@ -33,26 +33,22 @@ namespace v8 {
namespace internal {
SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
int desired_chunk_size_in_bytes,
SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
size_t desired_chunk_size_in_bytes,
int buffer_size_in_chunks)
: record_size_(record_size_in_bytes / sizeof(Cell)),
chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
record_size_in_bytes),
record_size_in_bytes + sizeof(Cell)),
chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
buffer_size_(chunk_size_ * buffer_size_in_chunks),
// The distance ensures that producer and consumer never step on
// each other's chunks and helps eviction of produced data from
// the CPU cache (having that chunk size is bigger than the cache.)
producer_consumer_distance_(2 * chunk_size_),
buffer_(NewArray<Cell>(buffer_size_ + 1)) {
buffer_(NewArray<Cell>(buffer_size_)) {
ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
ASSERT(buffer_size_in_chunks > 2);
// Clean up the whole buffer to avoid encountering a random kEnd
// while enqueuing.
for (int i = 0; i < buffer_size_; ++i) {
// Mark all chunks as clear.
for (int i = 0; i < buffer_size_; i += chunk_size_) {
buffer_[i] = kClear;
}
buffer_[buffer_size_] = kEnd;
// Layout producer and consumer position pointers each on their own
// cache lines to avoid cache lines thrashing due to simultaneous
......@@ -67,6 +63,7 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
producer_pos_ = reinterpret_cast<ProducerPosition*>(
RoundUp(positions_, kProcessorCacheLineSize));
producer_pos_->next_chunk_pos = buffer_;
producer_pos_->enqueue_pos = buffer_;
consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
......@@ -74,7 +71,11 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
positions_ + positions_size);
consumer_pos_->dequeue_chunk_pos = buffer_;
consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
// The distance ensures that producer and consumer never step on
// each other's chunks and helps eviction of produced data from
// the CPU cache (having that chunk size is bigger than the cache.)
const int producer_consumer_distance = (2 * chunk_size_);
consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
consumer_pos_->dequeue_pos = NULL;
}
......@@ -89,9 +90,11 @@ void* SamplingCircularQueue::StartDequeue() {
if (consumer_pos_->dequeue_pos != NULL) {
return consumer_pos_->dequeue_pos;
} else {
if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
// Skip marker.
consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
consumer_pos_->dequeue_end_pos =
consumer_pos_->dequeue_chunk_pos + chunk_size_;
return consumer_pos_->dequeue_pos;
} else {
return NULL;
......
......@@ -45,8 +45,8 @@ namespace internal {
class SamplingCircularQueue {
public:
// Executed on the application thread.
SamplingCircularQueue(int record_size_in_bytes,
int desired_chunk_size_in_bytes,
SamplingCircularQueue(size_t record_size_in_bytes,
size_t desired_chunk_size_in_bytes,
int buffer_size_in_chunks);
~SamplingCircularQueue();
......@@ -67,12 +67,16 @@ class SamplingCircularQueue {
void FlushResidualRecords();
typedef AtomicWord Cell;
// Reserved values for the first cell of a record.
static const Cell kClear = 0; // Marks clean (processed) chunks.
static const Cell kEnd = -1; // Marks the end of the buffer.
private:
// Reserved values for the chunk marker (first Cell in each chunk).
enum {
kClear, // Marks clean (processed) chunks.
kEnqueueStarted // Marks chunks where enqueue started.
};
struct ProducerPosition {
Cell* next_chunk_pos;
Cell* enqueue_pos;
};
struct ConsumerPosition {
......@@ -85,10 +89,9 @@ class SamplingCircularQueue {
INLINE(void WrapPositionIfNeeded(Cell** pos));
const int record_size_;
const int chunk_size_in_bytes_;
const size_t chunk_size_in_bytes_;
const int chunk_size_;
const int buffer_size_;
const int producer_consumer_distance_;
Cell* buffer_;
byte* positions_;
ProducerPosition* producer_pos_;
......
......@@ -110,18 +110,8 @@ class TickSampleEventRecord {
// The parameterless constructor is used when we dequeue data from
// the ticks buffer.
TickSampleEventRecord() { }
explicit TickSampleEventRecord(unsigned order)
: filler(1),
order(order) {
ASSERT(filler != SamplingCircularQueue::kClear);
}
explicit TickSampleEventRecord(unsigned order) : order(order) { }
// The first machine word of a TickSampleEventRecord must not ever
// become equal to SamplingCircularQueue::kClear. As both order and
// TickSample's first field are not reliable in this sense (order
// can overflow, TickSample can have all fields reset), we are
// forced to use an artificial filler field.
int filler;
unsigned order;
TickSample sample;
......
......@@ -42,8 +42,6 @@ TEST(SamplingCircularQueue) {
3);
// Check that we are using non-reserved values.
CHECK_NE(SamplingCircularQueue::kClear, 1);
CHECK_NE(SamplingCircularQueue::kEnd, 1);
// Fill up the first chunk.
CHECK_EQ(NULL, scq.StartDequeue());
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
......@@ -153,8 +151,6 @@ TEST(SamplingCircularQueueMultithreading) {
scq.FlushResidualRecords();
// Check that we are using non-reserved values.
CHECK_NE(SamplingCircularQueue::kClear, 1);
CHECK_NE(SamplingCircularQueue::kEnd, 1);
ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment