quill
BoundedSPSCQueue.h
1 #pragma once
2 
3 #include "quill/core/Attributes.h"
4 #include "quill/core/Common.h"
5 #include "quill/core/MathUtilities.h"
6 #include "quill/core/QuillError.h"
7 
8 #include <atomic>
9 #include <cerrno>
10 #include <cstddef>
11 #include <cstdint>
12 #include <cstring>
13 #include <string>
14 
15 #if defined(_WIN32)
16  #include <malloc.h>
17 #else
18  #include <sys/mman.h>
19 #endif
20 
21 #if defined(QUILL_X86ARCH)
22  #if defined(_WIN32)
23  #include <intrin.h>
24  #else
25  #if __has_include(<x86gprintrin.h>)
26  #if defined(__GNUC__) && __GNUC__ > 10
27  #include <emmintrin.h>
28  #include <x86gprintrin.h>
29  #elif defined(__clang_major__)
30  // clang needs immintrin for _mm_clflushopt
31  #include <immintrin.h>
32  #endif
33  #else
34  #include <immintrin.h>
35  #include <x86intrin.h>
36  #endif
37  #endif
38 #endif
39 
40 QUILL_BEGIN_NAMESPACE
41 
42 namespace detail
43 {
44 
45 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
46  #pragma warning(push)
47  #pragma warning(disable : 4324)
48 #endif
49 
53 template <typename T>
55 {
56 public:
57  using integer_type = T;
58 
59  static_assert(integer_type{0} < static_cast<integer_type>(~integer_type{0}),
60  "BoundedSPSCQueueImpl integer_type must be unsigned");
61 
63  {
64  std::byte* write_buffer{nullptr};
65  integer_type writer_pos{0};
66  BoundedSPSCQueueImpl* bounded_queue{nullptr};
67  };
68 
69  QUILL_ATTRIBUTE_HOT explicit BoundedSPSCQueueImpl(integer_type capacity,
70  HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
71  integer_type reader_store_percent = 5)
72  : _capacity(_validate_capacity(capacity)),
73  _mask(_capacity - 1),
74  _bytes_per_batch(static_cast<integer_type>(
75  (static_cast<double>(_capacity) * static_cast<double>(reader_store_percent)) / 100.0)),
76  _storage(static_cast<std::byte*>(_alloc_aligned(
77  2u * static_cast<size_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
78  _huge_pages_policy(huge_pages_policy)
79  {
80  size_t const storage_size = 2u * static_cast<size_t>(_capacity);
81  std::memset(_storage, 0, storage_size);
82 
83  _atomic_writer_pos.store(0);
84  _atomic_reader_pos.store(0);
85 
86  // reader_pos starts at 0, so cached value is 0 + capacity
87  _reader_pos_cache_plus_capacity = _capacity;
88 
89 #if defined(QUILL_X86ARCH)
90  // remove log memory from cache
91  for (size_t i = 0; i < storage_size; i += QUILL_CACHE_LINE_SIZE)
92  {
93  _mm_clflush(_storage + i);
94  }
95 
96  uint64_t const cache_lines = (_capacity >= 2048) ? 32 : 16;
97 
98  for (uint64_t i = 0; i < cache_lines; ++i)
99  {
100  _mm_prefetch(reinterpret_cast<char const*>(_storage + (QUILL_CACHE_LINE_SIZE * i)), _MM_HINT_T0);
101  }
102 #endif
103  }
104 
105  ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
106 
110  BoundedSPSCQueueImpl(BoundedSPSCQueueImpl const&) = delete;
111  BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl const&) = delete;
112 
113  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
114  {
115  if (static_cast<integer_type>(_reader_pos_cache_plus_capacity - _writer_pos) < n)
116  {
117  // not enough space, we need to load reader and re-check
118  _reader_pos_cache_plus_capacity = _atomic_reader_pos.load(std::memory_order_acquire) + _capacity;
119 
120  if (static_cast<integer_type>(_reader_pos_cache_plus_capacity - _writer_pos) < n)
121  {
122  return nullptr;
123  }
124  }
125 
126  return _storage + (_writer_pos & _mask);
127  }
128 
129  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT WriteReservation prepare_write_reserve_cached(integer_type n) noexcept
130  {
131  integer_type const writer_pos = _writer_pos;
132 
133  if (QUILL_UNLIKELY(static_cast<integer_type>(_reader_pos_cache_plus_capacity - writer_pos) < n))
134  {
135  return WriteReservation{nullptr, writer_pos, this};
136  }
137 
138  // _storage is never null after construction; hint lets the compiler
139  // eliminate a redundant null-check on the computed write pointer.
140  QUILL_ASSUME(_storage != nullptr);
141 
142  return WriteReservation{_storage + (writer_pos & _mask), writer_pos, this};
143  }
144 
145  QUILL_ATTRIBUTE_HOT void finish_write(integer_type n) noexcept { _writer_pos += n; }
146 
147  QUILL_ATTRIBUTE_HOT void commit_write() noexcept
148  {
149  // set the atomic flag so the reader can see write
150  _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
151 
152 #if defined(QUILL_X86ARCH)
153  // flush writen cache lines
154  _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
155 
156  // prefetch a future cache line
157  _mm_prefetch(
158  reinterpret_cast<char const*>(_storage + ((_writer_pos + QUILL_CACHE_LINE_SIZE * 10) & _mask)), _MM_HINT_T0);
159 #endif
160  }
161 
165  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
166  {
167  finish_and_commit_write_reservation(_writer_pos + n);
168  }
169 
170  QUILL_ATTRIBUTE_HOT void finish_and_commit_write_reservation(integer_type new_writer_pos) noexcept
171  {
172  _writer_pos = new_writer_pos;
173 
174  // set the atomic flag so the reader can see write
175  _atomic_writer_pos.store(new_writer_pos, std::memory_order_release);
176 
177 #if defined(QUILL_X86ARCH)
178  // flush writen cache lines
179  _flush_cachelines(_last_flushed_writer_pos, new_writer_pos);
180 
181  // prefetch a future cache line
182  _mm_prefetch(
183  reinterpret_cast<char const*>(_storage + ((new_writer_pos + QUILL_CACHE_LINE_SIZE * 10) & _mask)), _MM_HINT_T0);
184 #endif
185  }
186 
187  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
188  {
189  if (empty())
190  {
191  return nullptr;
192  }
193 
194  return _storage + (_reader_pos & _mask);
195  }
196 
197  QUILL_ATTRIBUTE_HOT void finish_read(integer_type n) noexcept { _reader_pos += n; }
198 
199  QUILL_ATTRIBUTE_HOT void commit_read() noexcept
200  {
201  if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
202  {
203  _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
204 
205 #if defined(QUILL_X86ARCH)
206  _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
207 #endif
208  }
209  }
210 
215  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
216  {
217  if (_writer_pos_cache == _reader_pos)
218  {
219  // if we think the queue is empty we also load the atomic variable to check further
220  _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
221 
222  if (_writer_pos_cache == _reader_pos)
223  {
224  return true;
225  }
226  }
227 
228  return false;
229  }
230 
231  QUILL_NODISCARD integer_type capacity() const noexcept
232  {
233  return static_cast<integer_type>(_capacity);
234  }
235 
236  QUILL_NODISCARD HugePagesPolicy huge_pages_policy() const noexcept { return _huge_pages_policy; }
237 
238 private:
239 #if defined(QUILL_X86ARCH)
240  QUILL_ATTRIBUTE_HOT void _flush_cachelines(integer_type& last, integer_type offset)
241  {
242  integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
243  integer_type const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
244 
245  if (cur_diff > last_diff)
246  {
247  do
248  {
249  _mm_clflushopt(_storage + (last_diff & _mask));
250  last_diff += QUILL_CACHE_LINE_SIZE;
251  } while (cur_diff > last_diff);
252 
253  last = last_diff;
254  }
255  }
256 #endif
257 
264  QUILL_NODISCARD static std::byte* _align_pointer(void* pointer, size_t alignment) noexcept
265  {
266  QUILL_ASSERT(is_power_of_two(alignment),
267  "alignment must be a power of two in BoundedSPSCQueue::_align_pointer()");
268  return reinterpret_cast<std::byte*>((reinterpret_cast<uintptr_t>(pointer) + (alignment - 1ul)) &
269  ~(alignment - 1ul));
270  }
271 
288  static integer_type _validate_capacity(QUILL_MAYBE_UNUSED integer_type capacity)
289  {
290 #if defined(QUILL_X86ARCH)
291  if (capacity < 1024)
292  {
293  QUILL_THROW(QuillError{"Capacity must be at least 1024"});
294  }
295 #endif
296 
297  size_t const rounded_capacity = next_power_of_two(static_cast<size_t>(capacity));
298 
299  constexpr auto max_integer_capacity = []() constexpr
300  {
301  // avoid including limits
302  if constexpr (sizeof(integer_type) >= sizeof(size_t))
303  {
304  return SIZE_MAX;
305  }
306  else if constexpr (sizeof(integer_type) <= sizeof(uint8_t))
307  {
308  return static_cast<size_t>(UINT8_MAX);
309  }
310  else if constexpr (sizeof(integer_type) <= sizeof(uint16_t))
311  {
312  return static_cast<size_t>(UINT16_MAX);
313  }
314  else if constexpr (sizeof(integer_type) <= sizeof(uint32_t))
315  {
316  return static_cast<size_t>(UINT32_MAX);
317  }
318  else
319  {
320  return static_cast<size_t>(UINT64_MAX);
321  }
322  }();
323 
324  if (QUILL_UNLIKELY(rounded_capacity > max_integer_capacity))
325  {
326  QUILL_THROW(QuillError{"BoundedSPSCQueue capacity exceeds the queue position type"});
327  }
328 
329  // The logical capacity is tracked as integer_type, but the backing allocation is byte-sized:
330  // two mirrored queue regions plus POSIX alignment metadata. Validate the allocation size in
331  // size_t before multiplying by two, otherwise an extreme capacity can wrap to a tiny mmap.
332  constexpr size_t metadata_size{2u * sizeof(size_t)};
333  constexpr size_t max_rounded_capacity = (SIZE_MAX - metadata_size - QUILL_CACHE_LINE_ALIGNED) / 2u;
334 
335  if (QUILL_UNLIKELY(rounded_capacity > max_rounded_capacity))
336  {
337  QUILL_THROW(QuillError{"BoundedSPSCQueue capacity is too large"});
338  }
339 
340  return static_cast<integer_type>(rounded_capacity);
341  }
342 
343  QUILL_NODISCARD static void* _alloc_aligned(size_t size, size_t alignment,
344  QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
345  {
346 #if defined(_WIN32)
347  void* p = _aligned_malloc(size, alignment);
348 
349  if (!p)
350  {
351  QUILL_THROW(QuillError{std::string{"_alloc_aligned failed with error message errno: "} +
352  std::to_string(errno)});
353  }
354 
355  return p;
356 #else
357  // Calculate the total size including the metadata and alignment
358  constexpr size_t metadata_size{2u * sizeof(size_t)};
359  size_t const total_size{size + metadata_size + alignment};
360 
361  // Allocate the memory
362  int flags = MAP_PRIVATE | MAP_ANONYMOUS;
363 
364  #if defined(__linux__)
365  if (huge_pages_policy != HugePagesPolicy::Never)
366  {
367  flags |= MAP_HUGETLB;
368  }
369  #endif
370 
371  void* mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
372 
373  #if defined(__linux__)
374  if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
375  {
376  // we tried but failed allocating huge pages, try normal pages instead
377  flags &= ~MAP_HUGETLB;
378  mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
379  }
380  #endif
381 
382  if (mem == MAP_FAILED)
383  {
384  QUILL_THROW(QuillError{std::string{"mmap failed. errno: "} + std::to_string(errno) +
385  " error: " + std::strerror(errno)});
386  }
387 
388  // Calculate the aligned address after the metadata
389  std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
390 
391  // Calculate the offset from the original memory location
392  auto const offset = static_cast<size_t>(aligned_address - static_cast<std::byte*>(mem));
393 
394  // Store the size and offset information in the metadata
395  std::memcpy(aligned_address - sizeof(size_t), &total_size, sizeof(total_size));
396  std::memcpy(aligned_address - (2u * sizeof(size_t)), &offset, sizeof(offset));
397 
398  return aligned_address;
399 #endif
400  }
401 
406  void static _free_aligned(void* ptr) noexcept
407  {
408 #if defined(_WIN32)
409  _aligned_free(ptr);
410 #else
411  // Retrieve the size and offset information from the metadata
412  size_t offset;
413  std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u * sizeof(size_t)), sizeof(offset));
414 
415  size_t total_size;
416  std::memcpy(&total_size, static_cast<std::byte*>(ptr) - sizeof(size_t), sizeof(total_size));
417 
418  // Calculate the original memory block address
419  void* mem = static_cast<std::byte*>(ptr) - offset;
420 
421  ::munmap(mem, total_size);
422 #endif
423  }
424 
425 private:
426  static constexpr integer_type QUILL_CACHE_LINE_MASK{QUILL_CACHE_LINE_SIZE - 1};
427 
428  integer_type const _capacity;
429  integer_type const _mask;
430  integer_type const _bytes_per_batch;
431  std::byte* const _storage{nullptr};
432  HugePagesPolicy const _huge_pages_policy;
433 
434  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
435  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
436  integer_type _reader_pos_cache_plus_capacity{0};
437  integer_type _last_flushed_writer_pos{0};
438 
439  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
440  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
441  mutable integer_type _writer_pos_cache{0};
442  integer_type _last_flushed_reader_pos{0};
443 };
444 
446 
447 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
448  #pragma warning(pop)
449 #endif
450 
451 } // namespace detail
452 
453 QUILL_END_NAMESPACE
A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:54
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:165
QUILL_NODISCARD size_t next_power_of_two(size_t n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:35
Definition: BoundedSPSCQueue.h:62
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:215
constexpr size_t QUILL_CACHE_LINE_SIZE
Cache line constants.
Definition: Common.h:67
custom exception
Definition: QuillError.h:47
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25