GNU Radio's TEST Package
readerwriterqueue.h
Go to the documentation of this file.
1 // ©2013-2015 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <type_traits>
9 #include <utility>
10 #include <cassert>
11 #include <stdexcept>
12 #include <cstdint>
13 #include <cstdlib> // For malloc/free & size_t
14 
15 
16 // A lock-free queue for a single-consumer, single-producer architecture.
17 // The queue is also wait-free in the common path (except if more memory
18 // needs to be allocated, in which case malloc is called).
19 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
20 // the original maximum size estimate is never exceeded.
21 // Tested on x86/x64 processors, but semantics should be correct for all
22 // architectures (given the right implementations in atomicops.h), provided
23 // that aligned integer and pointer accesses are naturally atomic.
24 // Note that there should only be one consumer thread and producer thread;
25 // Switching roles of the threads, or using multiple consecutive threads for
26 // one role, is not safe unless properly synchronized.
27 // Using the queue exclusively from one thread is fine, though a bit silly.
28 
29 #define CACHE_LINE_SIZE 64
30 
31 #ifdef AE_VCPP
32 #pragma warning(push)
33 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
34 #pragma warning(disable: 4820) // padding was added
35 #pragma warning(disable: 4127) // conditional expression is constant
36 #endif
37 
38 namespace moodycamel {
39 
40 template<typename T, size_t MAX_BLOCK_SIZE = 512>
42 {
43  // Design: Based on a queue-of-queues. The low-level queues are just
44  // circular buffers with front and tail indices indicating where the
45  // next element to dequeue is and where the next element can be enqueued,
46  // respectively. Each low-level queue is called a "block". Each block
47  // wastes exactly one element's worth of space to keep the design simple
48  // (if front == tail then the queue is empty, and can't be full).
49  // The high-level queue is a circular linked list of blocks; again there
50  // is a front and tail, but this time they are pointers to the blocks.
51  // The front block is where the next element to be dequeued is, provided
52  // the block is not empty. The back block is where elements are to be
53  // enqueued, provided the block is not full.
54  // The producer thread owns all the tail indices/pointers. The consumer
55  // thread owns all the front indices/pointers. Both threads read each
56  // other's variables, but only the owning thread updates them. E.g. After
57  // the consumer reads the producer's tail, the tail may change before the
58  // consumer is done dequeuing an object, but the consumer knows the tail
59  // will never go backwards, only forwards.
60  // If there is no room to enqueue an object, an additional block (of
61  // equal size to the last block) is added. Blocks are never removed.
62 
63 public:
64  // Constructs a queue that can hold maxSize elements without further
65  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
66  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
67  // at least one extra buffer block).
68  explicit ReaderWriterQueue(size_t maxSize = 15)
69 #ifndef NDEBUG
70  : enqueuing(false)
71  ,dequeuing(false)
72 #endif
73  {
74  assert(maxSize > 0);
75  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
76  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
77 
78  Block* firstBlock = nullptr;
79 
80  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
81  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
82  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
83  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
84  // between front == tail meaning "empty" and "full".
85  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
86  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
87  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
88  largestBlockSize = MAX_BLOCK_SIZE;
89  Block* lastBlock = nullptr;
90  for (size_t i = 0; i != initialBlockCount; ++i) {
91  auto block = make_block(largestBlockSize);
92  if (block == nullptr) {
93  throw std::bad_alloc();
94  }
95  if (firstBlock == nullptr) {
96  firstBlock = block;
97  }
98  else {
99  lastBlock->next = block;
100  }
101  lastBlock = block;
102  block->next = firstBlock;
103  }
104  }
105  else {
106  firstBlock = make_block(largestBlockSize);
107  if (firstBlock == nullptr) {
108  throw std::bad_alloc();
109  }
110  firstBlock->next = firstBlock;
111  }
112  frontBlock = firstBlock;
113  tailBlock = firstBlock;
114 
115  // Make sure the reader/writer threads will have the initialized memory setup above:
117  }
118 
119  // Note: The queue should not be accessed concurrently while it's
120  // being deleted. It's up to the user to synchronize this.
122  {
123  // Make sure we get the latest version of all variables from other CPUs:
125 
126  // Destroy any remaining objects in queue and free memory
127  Block* frontBlock_ = frontBlock;
128  Block* block = frontBlock_;
129  do {
130  Block* nextBlock = block->next;
131  size_t blockFront = block->front;
132  size_t blockTail = block->tail;
133 
134  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
135  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
136  element->~T();
137  (void)element;
138  }
139 
140  auto rawBlock = block->rawThis;
141  block->~Block();
142  std::free(rawBlock);
143  block = nextBlock;
144  } while (block != frontBlock_);
145  }
146 
147 
148  // Enqueues a copy of element if there is room in the queue.
149  // Returns true if the element was enqueued, false otherwise.
150  // Does not allocate memory.
151  AE_FORCEINLINE bool try_enqueue(T const& element)
152  {
153  return inner_enqueue<CannotAlloc>(element);
154  }
155 
156  // Enqueues a moved copy of element if there is room in the queue.
157  // Returns true if the element was enqueued, false otherwise.
158  // Does not allocate memory.
159  AE_FORCEINLINE bool try_enqueue(T&& element)
160  {
161  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
162  }
163 
164 
165  // Enqueues a copy of element on the queue.
166  // Allocates an additional block of memory if needed.
167  // Only fails (returns false) if memory allocation fails.
168  AE_FORCEINLINE bool enqueue(T const& element)
169  {
170  return inner_enqueue<CanAlloc>(element);
171  }
172 
173  // Enqueues a moved copy of element on the queue.
174  // Allocates an additional block of memory if needed.
175  // Only fails (returns false) if memory allocation fails.
176  AE_FORCEINLINE bool enqueue(T&& element)
177  {
178  return inner_enqueue<CanAlloc>(std::forward<T>(element));
179  }
180 
181 
182  // Attempts to dequeue an element; if the queue is empty,
183  // returns false instead. If the queue has at least one element,
184  // moves front to result using operator=, then returns true.
185  template<typename U>
186  bool try_dequeue(U& result)
187  {
188 #ifndef NDEBUG
189  ReentrantGuard guard(this->dequeuing);
190 #endif
191 
192  // High-level pseudocode:
193  // Remember where the tail block is
194  // If the front block has an element in it, dequeue it
195  // Else
196  // If front block was the tail block when we entered the function, return false
197  // Else advance to next block and dequeue the item there
198 
199  // Note that we have to use the value of the tail block from before we check if the front
200  // block is full or not, in case the front block is empty and then, before we check if the
201  // tail block is at the front block or not, the producer fills up the front block *and
202  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
203  // reproducible in practice.
204  // In order to avoid overhead in the common case, though, we do a double-checked pattern
205  // where we have the fast path if the front block is not empty, then read the tail block,
206  // then re-read the front block and check if it's not empty again, then check if the tail
207  // block has advanced.
208 
209  Block* frontBlock_ = frontBlock.load();
210  size_t blockTail = frontBlock_->localTail;
211  size_t blockFront = frontBlock_->front.load();
212 
213  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
215 
216  non_empty_front_block:
217  // Front block not empty, dequeue from here
218  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
219  result = std::move(*element);
220  element->~T();
221 
222  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
223 
225  frontBlock_->front = blockFront;
226  }
227  else if (frontBlock_ != tailBlock.load()) {
229 
230  frontBlock_ = frontBlock.load();
231  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
232  blockFront = frontBlock_->front.load();
234 
235  if (blockFront != blockTail) {
236  // Oh look, the front block isn't empty after all
237  goto non_empty_front_block;
238  }
239 
240  // Front block is empty but there's another block ahead, advance to it
241  Block* nextBlock = frontBlock_->next;
242  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
243  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
244  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
245 
246  size_t nextBlockFront = nextBlock->front.load();
247  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
249 
250  // Since the tailBlock is only ever advanced after being written to,
251  // we know there's for sure an element to dequeue on it
252  assert(nextBlockFront != nextBlockTail);
253  AE_UNUSED(nextBlockTail);
254 
255  // We're done with this block, let the producer use it if it needs
256  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
257  frontBlock = frontBlock_ = nextBlock;
258 
259  compiler_fence(memory_order_release); // Not strictly needed
260 
261  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
262 
263  result = std::move(*element);
264  element->~T();
265 
266  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
267 
269  frontBlock_->front = nextBlockFront;
270  }
271  else {
272  // No elements in current block and no other block to advance to
273  return false;
274  }
275 
276  return true;
277  }
278 
279 
280  // Returns a pointer to the front element in the queue (the one that
281  // would be removed next by a call to `try_dequeue` or `pop`). If the
282  // queue appears empty at the time the method is called, nullptr is
283  // returned instead.
284  // Must be called only from the consumer thread.
285  T* peek()
286  {
287 #ifndef NDEBUG
288  ReentrantGuard guard(this->dequeuing);
289 #endif
290  // See try_dequeue() for reasoning
291 
292  Block* frontBlock_ = frontBlock.load();
293  size_t blockTail = frontBlock_->localTail;
294  size_t blockFront = frontBlock_->front.load();
295 
296  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
298  non_empty_front_block:
299  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
300  }
301  else if (frontBlock_ != tailBlock.load()) {
303  frontBlock_ = frontBlock.load();
304  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
305  blockFront = frontBlock_->front.load();
307 
308  if (blockFront != blockTail) {
309  goto non_empty_front_block;
310  }
311 
312  Block* nextBlock = frontBlock_->next;
313 
314  size_t nextBlockFront = nextBlock->front.load();
316 
317  assert(nextBlockFront != nextBlock->tail.load());
318  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
319  }
320 
321  return nullptr;
322  }
323 
324  // Removes the front element from the queue, if any, without returning it.
325  // Returns true on success, or false if the queue appeared empty at the time
326  // `pop` was called.
327  bool pop()
328  {
329 #ifndef NDEBUG
330  ReentrantGuard guard(this->dequeuing);
331 #endif
332  // See try_dequeue() for reasoning
333 
334  Block* frontBlock_ = frontBlock.load();
335  size_t blockTail = frontBlock_->localTail;
336  size_t blockFront = frontBlock_->front.load();
337 
338  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
340 
341  non_empty_front_block:
342  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
343  element->~T();
344 
345  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
346 
348  frontBlock_->front = blockFront;
349  }
350  else if (frontBlock_ != tailBlock.load()) {
352  frontBlock_ = frontBlock.load();
353  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
354  blockFront = frontBlock_->front.load();
356 
357  if (blockFront != blockTail) {
358  goto non_empty_front_block;
359  }
360 
361  // Front block is empty but there's another block ahead, advance to it
362  Block* nextBlock = frontBlock_->next;
363 
364  size_t nextBlockFront = nextBlock->front.load();
365  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
367 
368  assert(nextBlockFront != nextBlockTail);
369  AE_UNUSED(nextBlockTail);
370 
372  frontBlock = frontBlock_ = nextBlock;
373 
375 
376  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
377  element->~T();
378 
379  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
380 
382  frontBlock_->front = nextBlockFront;
383  }
384  else {
385  // No elements in current block and no other block to advance to
386  return false;
387  }
388 
389  return true;
390  }
391 
392  // Returns the approximate number of items currently in the queue.
393  // Safe to call from both the producer and consumer threads.
394  inline size_t size_approx() const
395  {
396  size_t result = 0;
397  Block* frontBlock_ = frontBlock.load();
398  Block* block = frontBlock_;
399  do {
401  size_t blockFront = block->front.load();
402  size_t blockTail = block->tail.load();
403  result += (blockTail - blockFront) & block->sizeMask;
404  block = block->next.load();
405  } while (block != frontBlock_);
406  return result;
407  }
408 
409 
410 private:
411  enum AllocationMode { CanAlloc, CannotAlloc };
412 
413  template<AllocationMode canAlloc, typename U>
414  bool inner_enqueue(U&& element)
415  {
416 #ifndef NDEBUG
417  ReentrantGuard guard(this->enqueuing);
418 #endif
419 
420  // High-level pseudocode (assuming we're allowed to alloc a new block):
421  // If room in tail block, add to tail
422  // Else check next block
423  // If next block is not the head block, enqueue on next block
424  // Else create a new block and enqueue there
425  // Advance tail to the block we just enqueued to
426 
427  Block* tailBlock_ = tailBlock.load();
428  size_t blockFront = tailBlock_->localFront;
429  size_t blockTail = tailBlock_->tail.load();
430 
431  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
432  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
434  // This block has room for at least one more element
435  char* location = tailBlock_->data + blockTail * sizeof(T);
436  new (location) T(std::forward<U>(element));
437 
439  tailBlock_->tail = nextBlockTail;
440  }
441  else {
443  if (tailBlock_->next.load() != frontBlock) {
444  // Note that the reason we can't advance to the frontBlock and start adding new entries there
445  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
446  // instead of advancing to the next full block (whose values were enqueued first and so should be
447  // consumed first).
448 
449  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
450 
451  // tailBlock is full, but there's a free block ahead, use it
452  Block* tailBlockNext = tailBlock_->next.load();
453  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
454  nextBlockTail = tailBlockNext->tail.load();
456 
457  // This block must be empty since it's not the head block and we
458  // go through the blocks in a circle
459  assert(nextBlockFront == nextBlockTail);
460  tailBlockNext->localFront = nextBlockFront;
461 
462  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
463  new (location) T(std::forward<U>(element));
464 
465  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
466 
468  tailBlock = tailBlockNext;
469  }
470  else if (canAlloc == CanAlloc) {
471  // tailBlock is full and there's no free block ahead; create a new block
472  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
473  auto newBlock = make_block(newBlockSize);
474  if (newBlock == nullptr) {
475  // Could not allocate a block!
476  return false;
477  }
478  largestBlockSize = newBlockSize;
479 
480  new (newBlock->data) T(std::forward<U>(element));
481 
482  assert(newBlock->front == 0);
483  newBlock->tail = newBlock->localTail = 1;
484 
485  newBlock->next = tailBlock_->next.load();
486  tailBlock_->next = newBlock;
487 
488  // Might be possible for the dequeue thread to see the new tailBlock->next
489  // *without* seeing the new tailBlock value, but this is OK since it can't
490  // advance to the next block until tailBlock is set anyway (because the only
491  // case where it could try to read the next is if it's already at the tailBlock,
492  // and it won't advance past tailBlock in any circumstance).
493 
495  tailBlock = newBlock;
496  }
497  else if (canAlloc == CannotAlloc) {
498  // Would have had to allocate a new block to enqueue, but not allowed
499  return false;
500  }
501  else {
502  assert(false && "Should be unreachable code");
503  return false;
504  }
505  }
506 
507  return true;
508  }
509 
510 
511  // Disable copying
513 
514  // Disable assignment
515  ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
516 
517 
518 
519  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
520  {
521  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
522  --x;
523  x |= x >> 1;
524  x |= x >> 2;
525  x |= x >> 4;
526  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
527  x |= x >> (i << 3);
528  }
529  ++x;
530  return x;
531  }
532 
533  template<typename U>
534  static AE_FORCEINLINE char* align_for(char* ptr)
535  {
536  const std::size_t alignment = std::alignment_of<U>::value;
537  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
538  }
539 private:
540 #ifndef NDEBUG
541  struct ReentrantGuard
542  {
543  ReentrantGuard(bool& _inSection)
544  : inSection(_inSection)
545  {
546  assert(!inSection);
547  if (inSection) {
548  throw std::runtime_error("ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
549  }
550 
551  inSection = true;
552  }
553 
554  ~ReentrantGuard() { inSection = false; }
555 
556  private:
557  ReentrantGuard& operator=(ReentrantGuard const&);
558 
559  private:
560  bool& inSection;
561  };
562 #endif
563 
564  struct Block
565  {
566  // Avoid false-sharing by putting highly contended variables on their own cache lines
567  weak_atomic<size_t> front; // (Atomic) Elements are read from here
568  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
569 
570  char cachelineFiller0[CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
571  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
572  size_t localFront;
573 
574  char cachelineFiller1[CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
575  weak_atomic<Block*> next; // (Atomic)
576 
577  char* data; // Contents (on heap) are aligned to T's alignment
578 
579  const size_t sizeMask;
580 
581 
582  // size must be a power of two (and greater than 0)
583  Block(size_t const& _size, char* _rawThis, char* _data)
584  : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
585  {
586  }
587 
588  private:
589  // C4512 - Assignment operator could not be generated
590  Block& operator=(Block const&);
591 
592  public:
593  char* rawThis;
594  };
595 
596 
597  static Block* make_block(size_t capacity)
598  {
599  // Allocate enough memory for the block itself, as well as all the elements it will contain
600  auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
601  size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
602  auto newBlockRaw = static_cast<char*>(std::malloc(size));
603  if (newBlockRaw == nullptr) {
604  return nullptr;
605  }
606 
607  auto newBlockAligned = align_for<Block>(newBlockRaw);
608  auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
609  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
610  }
611 
612 private:
613  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
614 
615  char cachelineFiller[CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
616  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
617 
618  size_t largestBlockSize;
619 
620 #ifndef NDEBUG
621  bool enqueuing;
622  bool dequeuing;
623 #endif
624 };
625 
626 // Like ReaderWriterQueue, but also providees blocking operations
627 template<typename T, size_t MAX_BLOCK_SIZE = 512>
629 {
630 private:
631  typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
632 
633 public:
634  explicit BlockingReaderWriterQueue(size_t maxSize = 15)
635  : inner(maxSize)
636  { }
637 
638 
639  // Enqueues a copy of element if there is room in the queue.
640  // Returns true if the element was enqueued, false otherwise.
641  // Does not allocate memory.
642  AE_FORCEINLINE bool try_enqueue(T const& element)
643  {
644  if (inner.try_enqueue(element)) {
645  sema.signal();
646  return true;
647  }
648  return false;
649  }
650 
651  // Enqueues a moved copy of element if there is room in the queue.
652  // Returns true if the element was enqueued, false otherwise.
653  // Does not allocate memory.
654  AE_FORCEINLINE bool try_enqueue(T&& element)
655  {
656  if (inner.try_enqueue(std::forward<T>(element))) {
657  sema.signal();
658  return true;
659  }
660  return false;
661  }
662 
663 
664  // Enqueues a copy of element on the queue.
665  // Allocates an additional block of memory if needed.
666  // Only fails (returns false) if memory allocation fails.
667  AE_FORCEINLINE bool enqueue(T const& element)
668  {
669  if (inner.enqueue(element)) {
670  sema.signal();
671  return true;
672  }
673  return false;
674  }
675 
676  // Enqueues a moved copy of element on the queue.
677  // Allocates an additional block of memory if needed.
678  // Only fails (returns false) if memory allocation fails.
679  AE_FORCEINLINE bool enqueue(T&& element)
680  {
681  if (inner.enqueue(std::forward<T>(element))) {
682  sema.signal();
683  return true;
684  }
685  return false;
686  }
687 
688 
689  // Attempts to dequeue an element; if the queue is empty,
690  // returns false instead. If the queue has at least one element,
691  // moves front to result using operator=, then returns true.
692  template<typename U>
693  bool try_dequeue(U& result)
694  {
695  if (sema.tryWait()) {
696  bool success = inner.try_dequeue(result);
697  assert(success);
698  AE_UNUSED(success);
699  return true;
700  }
701  return false;
702  }
703 
704 
705  // Attempts to dequeue an element; if the queue is empty,
706  // waits until an element is available, then dequeues it.
707  template<typename U>
708  void wait_dequeue(U& result)
709  {
710  sema.wait();
711  bool success = inner.try_dequeue(result);
712  AE_UNUSED(result);
713  assert(success);
714  AE_UNUSED(success);
715  }
716 
717 
718  // Returns a pointer to the front element in the queue (the one that
719  // would be removed next by a call to `try_dequeue` or `pop`). If the
720  // queue appears empty at the time the method is called, nullptr is
721  // returned instead.
722  // Must be called only from the consumer thread.
724  {
725  return inner.peek();
726  }
727 
728  // Removes the front element from the queue, if any, without returning it.
729  // Returns true on success, or false if the queue appeared empty at the time
730  // `pop` was called.
732  {
733  if (sema.tryWait()) {
734  bool result = inner.pop();
735  assert(result);
736  AE_UNUSED(result);
737  return true;
738  }
739  return false;
740  }
741 
742  // Returns the approximate number of items currently in the queue.
743  // Safe to call from both the producer and consumer threads.
745  {
746  return sema.availableApprox();
747  }
748 
749 
750 private:
751  // Disable copying & assignment
753  BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
754 
755 private:
756  ReaderWriterQueue inner;
757  spsc_sema::LightweightSemaphore sema;
758 };
759 
760 } // end namespace moodycamel
761 
762 #ifdef AE_VCPP
763 #pragma warning(pop)
764 #endif
#define AE_UNUSED(x)
Definition: atomicops.h:41
#define AE_FORCEINLINE
Definition: atomicops.h:51
Definition: readerwriterqueue.h:629
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:667
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:654
AE_FORCEINLINE size_t size_approx() const
Definition: readerwriterqueue.h:744
AE_FORCEINLINE bool pop()
Definition: readerwriterqueue.h:731
AE_FORCEINLINE T * peek()
Definition: readerwriterqueue.h:723
void wait_dequeue(U &result)
Definition: readerwriterqueue.h:708
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:642
BlockingReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:634
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:693
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:679
Definition: readerwriterqueue.h:42
size_t size_approx() const
Definition: readerwriterqueue.h:394
bool pop()
Definition: readerwriterqueue.h:327
ReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:68
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:151
T * peek()
Definition: readerwriterqueue.h:285
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:168
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:176
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:159
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:186
~ReaderWriterQueue()
Definition: readerwriterqueue.h:121
bool tryWait()
Definition: atomicops.h:536
void wait()
Definition: atomicops.h:546
ssize_t availableApprox() const
Definition: atomicops.h:563
void signal(ssize_t count=1)
Definition: atomicops.h:552
AE_FORCEINLINE T load() const
Definition: atomicops.h:293
Definition: atomicops.h:68
@ memory_order_acquire
Definition: atomicops.h:72
@ memory_order_sync
Definition: atomicops.h:79
@ memory_order_release
Definition: atomicops.h:73
AE_FORCEINLINE void fence(memory_order order)
Definition: atomicops.h:193
AE_FORCEINLINE void compiler_fence(memory_order order)
Definition: atomicops.h:181
#define CACHE_LINE_SIZE
Definition: readerwriterqueue.h:29