29 #define CACHE_LINE_SIZE 64
33 #pragma warning(disable: 4324)
34 #pragma warning(disable: 4820)
35 #pragma warning(disable: 4127)
40 template<
typename T,
size_t MAX_BLOCK_SIZE = 512>
75 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) &&
"MAX_BLOCK_SIZE must be a power of 2");
76 assert(MAX_BLOCK_SIZE >= 2 &&
"MAX_BLOCK_SIZE must be at least 2");
78 Block* firstBlock =
nullptr;
80 largestBlockSize = ceilToPow2(maxSize + 1);
81 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
87 size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
88 largestBlockSize = MAX_BLOCK_SIZE;
89 Block* lastBlock =
nullptr;
90 for (
size_t i = 0; i != initialBlockCount; ++i) {
91 auto block = make_block(largestBlockSize);
92 if (block ==
nullptr) {
93 throw std::bad_alloc();
95 if (firstBlock ==
nullptr) {
99 lastBlock->next = block;
102 block->next = firstBlock;
106 firstBlock = make_block(largestBlockSize);
107 if (firstBlock ==
nullptr) {
108 throw std::bad_alloc();
110 firstBlock->next = firstBlock;
112 frontBlock = firstBlock;
113 tailBlock = firstBlock;
127 Block* frontBlock_ = frontBlock;
128 Block* block = frontBlock_;
130 Block* nextBlock = block->next;
131 size_t blockFront = block->front;
132 size_t blockTail = block->tail;
134 for (
size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
135 auto element =
reinterpret_cast<T*
>(block->data + i *
sizeof(T));
140 auto rawBlock = block->rawThis;
144 }
while (block != frontBlock_);
153 return inner_enqueue<CannotAlloc>(element);
161 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
170 return inner_enqueue<CanAlloc>(element);
178 return inner_enqueue<CanAlloc>(std::forward<T>(element));
189 ReentrantGuard guard(this->dequeuing);
209 Block* frontBlock_ = frontBlock.
load();
210 size_t blockTail = frontBlock_->localTail;
211 size_t blockFront = frontBlock_->front.load();
213 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
216 non_empty_front_block:
218 auto element =
reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
219 result = std::move(*element);
222 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
225 frontBlock_->front = blockFront;
227 else if (frontBlock_ != tailBlock.
load()) {
230 frontBlock_ = frontBlock.
load();
231 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
232 blockFront = frontBlock_->front.load();
235 if (blockFront != blockTail) {
237 goto non_empty_front_block;
241 Block* nextBlock = frontBlock_->next;
246 size_t nextBlockFront = nextBlock->front.load();
247 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
252 assert(nextBlockFront != nextBlockTail);
257 frontBlock = frontBlock_ = nextBlock;
261 auto element =
reinterpret_cast<T*
>(frontBlock_->data + nextBlockFront *
sizeof(T));
263 result = std::move(*element);
266 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
269 frontBlock_->front = nextBlockFront;
288 ReentrantGuard guard(this->dequeuing);
292 Block* frontBlock_ = frontBlock.
load();
293 size_t blockTail = frontBlock_->localTail;
294 size_t blockFront = frontBlock_->front.load();
296 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
298 non_empty_front_block:
299 return reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
301 else if (frontBlock_ != tailBlock.
load()) {
303 frontBlock_ = frontBlock.
load();
304 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
305 blockFront = frontBlock_->front.load();
308 if (blockFront != blockTail) {
309 goto non_empty_front_block;
312 Block* nextBlock = frontBlock_->next;
314 size_t nextBlockFront = nextBlock->front.load();
317 assert(nextBlockFront != nextBlock->tail.load());
318 return reinterpret_cast<T*
>(nextBlock->data + nextBlockFront *
sizeof(T));
330 ReentrantGuard guard(this->dequeuing);
334 Block* frontBlock_ = frontBlock.
load();
335 size_t blockTail = frontBlock_->localTail;
336 size_t blockFront = frontBlock_->front.load();
338 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
341 non_empty_front_block:
342 auto element =
reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
345 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
348 frontBlock_->front = blockFront;
350 else if (frontBlock_ != tailBlock.
load()) {
352 frontBlock_ = frontBlock.
load();
353 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
354 blockFront = frontBlock_->front.load();
357 if (blockFront != blockTail) {
358 goto non_empty_front_block;
362 Block* nextBlock = frontBlock_->next;
364 size_t nextBlockFront = nextBlock->front.load();
365 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
368 assert(nextBlockFront != nextBlockTail);
372 frontBlock = frontBlock_ = nextBlock;
376 auto element =
reinterpret_cast<T*
>(frontBlock_->data + nextBlockFront *
sizeof(T));
379 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
382 frontBlock_->front = nextBlockFront;
397 Block* frontBlock_ = frontBlock.
load();
398 Block* block = frontBlock_;
401 size_t blockFront = block->front.load();
402 size_t blockTail = block->tail.load();
403 result += (blockTail - blockFront) & block->sizeMask;
404 block = block->next.load();
405 }
while (block != frontBlock_);
411 enum AllocationMode { CanAlloc, CannotAlloc };
413 template<AllocationMode canAlloc,
typename U>
414 bool inner_enqueue(U&& element)
417 ReentrantGuard guard(this->enqueuing);
427 Block* tailBlock_ = tailBlock.
load();
428 size_t blockFront = tailBlock_->localFront;
429 size_t blockTail = tailBlock_->tail.load();
431 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
432 if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
435 char* location = tailBlock_->data + blockTail *
sizeof(T);
436 new (location) T(std::forward<U>(element));
439 tailBlock_->tail = nextBlockTail;
443 if (tailBlock_->next.load() != frontBlock) {
452 Block* tailBlockNext = tailBlock_->next.load();
453 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
454 nextBlockTail = tailBlockNext->tail.load();
459 assert(nextBlockFront == nextBlockTail);
460 tailBlockNext->localFront = nextBlockFront;
462 char* location = tailBlockNext->data + nextBlockTail *
sizeof(T);
463 new (location) T(std::forward<U>(element));
465 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
468 tailBlock = tailBlockNext;
470 else if (canAlloc == CanAlloc) {
472 auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
473 auto newBlock = make_block(newBlockSize);
474 if (newBlock ==
nullptr) {
478 largestBlockSize = newBlockSize;
480 new (newBlock->data) T(std::forward<U>(element));
482 assert(newBlock->front == 0);
483 newBlock->tail = newBlock->localTail = 1;
485 newBlock->next = tailBlock_->next.load();
486 tailBlock_->next = newBlock;
495 tailBlock = newBlock;
497 else if (canAlloc == CannotAlloc) {
502 assert(
false &&
"Should be unreachable code");
526 for (
size_t i = 1; i <
sizeof(size_t); i <<= 1) {
536 const std::size_t alignment = std::alignment_of<U>::value;
537 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
541 struct ReentrantGuard
543 ReentrantGuard(
bool& _inSection)
544 : inSection(_inSection)
548 throw std::runtime_error(
"ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
554 ~ReentrantGuard() { inSection =
false; }
557 ReentrantGuard& operator=(ReentrantGuard
const&);
567 weak_atomic<size_t> front;
570 char cachelineFiller0[
CACHE_LINE_SIZE -
sizeof(weak_atomic<size_t>) -
sizeof(
size_t)];
571 weak_atomic<size_t> tail;
574 char cachelineFiller1[
CACHE_LINE_SIZE -
sizeof(weak_atomic<size_t>) -
sizeof(
size_t)];
575 weak_atomic<Block*> next;
579 const size_t sizeMask;
583 Block(
size_t const& _size,
char* _rawThis,
char* _data)
584 : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
590 Block& operator=(Block
const&);
597 static Block* make_block(
size_t capacity)
600 auto size =
sizeof(Block) + std::alignment_of<Block>::value - 1;
601 size +=
sizeof(T) * capacity + std::alignment_of<T>::value - 1;
602 auto newBlockRaw =
static_cast<char*
>(std::malloc(size));
603 if (newBlockRaw ==
nullptr) {
607 auto newBlockAligned = align_for<Block>(newBlockRaw);
608 auto newBlockData = align_for<T>(newBlockAligned +
sizeof(Block));
609 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
613 weak_atomic<Block*> frontBlock;
616 weak_atomic<Block*> tailBlock;
618 size_t largestBlockSize;
627 template<
typename T,
size_t MAX_BLOCK_SIZE = 512>
681 if (inner.
enqueue(std::forward<T>(element))) {
734 bool result = inner.
pop();
756 ReaderWriterQueue inner;
757 spsc_sema::LightweightSemaphore sema;
#define AE_UNUSED(x)
Definition: atomicops.h:41
#define AE_FORCEINLINE
Definition: atomicops.h:51
Definition: readerwriterqueue.h:629
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:667
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:654
AE_FORCEINLINE size_t size_approx() const
Definition: readerwriterqueue.h:744
AE_FORCEINLINE bool pop()
Definition: readerwriterqueue.h:731
AE_FORCEINLINE T * peek()
Definition: readerwriterqueue.h:723
void wait_dequeue(U &result)
Definition: readerwriterqueue.h:708
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:642
BlockingReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:634
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:693
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:679
Definition: readerwriterqueue.h:42
size_t size_approx() const
Definition: readerwriterqueue.h:394
bool pop()
Definition: readerwriterqueue.h:327
ReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:68
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:151
T * peek()
Definition: readerwriterqueue.h:285
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:168
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:176
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:159
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:186
~ReaderWriterQueue()
Definition: readerwriterqueue.h:121
bool tryWait()
Definition: atomicops.h:536
void wait()
Definition: atomicops.h:546
ssize_t availableApprox() const
Definition: atomicops.h:563
void signal(ssize_t count=1)
Definition: atomicops.h:552
AE_FORCEINLINE T load() const
Definition: atomicops.h:293
Definition: atomicops.h:68
@ memory_order_acquire
Definition: atomicops.h:72
@ memory_order_sync
Definition: atomicops.h:79
@ memory_order_release
Definition: atomicops.h:73
AE_FORCEINLINE void fence(memory_order order)
Definition: atomicops.h:193
AE_FORCEINLINE void compiler_fence(memory_order order)
Definition: atomicops.h:181
#define CACHE_LINE_SIZE
Definition: readerwriterqueue.h:29