3 #include "quill/core/Attributes.h" 4 #include "quill/core/Common.h" 5 #include "quill/core/MathUtilities.h" 6 #include "quill/core/QuillError.h" 21 #if defined(QUILL_X86ARCH) 25 #if __has_include(<x86gprintrin.h>) 26 #if defined(__GNUC__) && __GNUC__ > 10 27 #include <emmintrin.h> 28 #include <x86gprintrin.h> 29 #elif defined(__clang_major__) 31 #include <immintrin.h> 34 #include <immintrin.h> 35 #include <x86intrin.h> 45 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) 47 #pragma warning(disable : 4324) 57 using integer_type = T;
59 static_assert(integer_type{0} <
static_cast<integer_type
>(~integer_type{0}),
60 "BoundedSPSCQueueImpl integer_type must be unsigned");
64 std::byte* write_buffer{
nullptr};
65 integer_type writer_pos{0};
70 HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
71 integer_type reader_store_percent = 5)
72 : _capacity(_validate_capacity(capacity)),
74 _bytes_per_batch(static_cast<integer_type>(
75 (static_cast<double>(_capacity) * static_cast<double>(reader_store_percent)) / 100.0)),
76 _storage(static_cast<std::byte*>(_alloc_aligned(
77 2u * static_cast<size_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
78 _huge_pages_policy(huge_pages_policy)
80 size_t const storage_size = 2u *
static_cast<size_t>(_capacity);
81 std::memset(_storage, 0, storage_size);
83 _atomic_writer_pos.store(0);
84 _atomic_reader_pos.store(0);
87 _reader_pos_cache_plus_capacity = _capacity;
89 #if defined(QUILL_X86ARCH) 93 _mm_clflush(_storage + i);
96 uint64_t
const cache_lines = (_capacity >= 2048) ? 32 : 16;
98 for (uint64_t i = 0; i < cache_lines; ++i)
105 ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
110 BoundedSPSCQueueImpl(BoundedSPSCQueueImpl
const&) =
delete;
111 BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl
const&) =
delete;
113 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
115 if (static_cast<integer_type>(_reader_pos_cache_plus_capacity - _writer_pos) < n)
118 _reader_pos_cache_plus_capacity = _atomic_reader_pos.load(std::memory_order_acquire) + _capacity;
120 if (static_cast<integer_type>(_reader_pos_cache_plus_capacity - _writer_pos) < n)
126 return _storage + (_writer_pos & _mask);
129 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
WriteReservation prepare_write_reserve_cached(integer_type n) noexcept
131 integer_type
const writer_pos = _writer_pos;
133 if (QUILL_UNLIKELY(static_cast<integer_type>(_reader_pos_cache_plus_capacity - writer_pos) < n))
140 QUILL_ASSUME(_storage !=
nullptr);
145 QUILL_ATTRIBUTE_HOT
void finish_write(integer_type n) noexcept { _writer_pos += n; }
147 QUILL_ATTRIBUTE_HOT
void commit_write() noexcept
150 _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
152 #if defined(QUILL_X86ARCH) 154 _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
158 reinterpret_cast<char const*>(_storage + ((_writer_pos +
QUILL_CACHE_LINE_SIZE * 10) & _mask)), _MM_HINT_T0);
167 finish_and_commit_write_reservation(_writer_pos + n);
170 QUILL_ATTRIBUTE_HOT
void finish_and_commit_write_reservation(integer_type new_writer_pos) noexcept
172 _writer_pos = new_writer_pos;
175 _atomic_writer_pos.store(new_writer_pos, std::memory_order_release);
177 #if defined(QUILL_X86ARCH) 179 _flush_cachelines(_last_flushed_writer_pos, new_writer_pos);
183 reinterpret_cast<char const*>(_storage + ((new_writer_pos +
QUILL_CACHE_LINE_SIZE * 10) & _mask)), _MM_HINT_T0);
187 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
194 return _storage + (_reader_pos & _mask);
197 QUILL_ATTRIBUTE_HOT
void finish_read(integer_type n) noexcept { _reader_pos += n; }
199 QUILL_ATTRIBUTE_HOT
void commit_read() noexcept
201 if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
203 _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
205 #if defined(QUILL_X86ARCH) 206 _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
215 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool empty() const noexcept
217 if (_writer_pos_cache == _reader_pos)
220 _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
222 if (_writer_pos_cache == _reader_pos)
231 QUILL_NODISCARD integer_type capacity()
const noexcept
233 return static_cast<integer_type
>(_capacity);
236 QUILL_NODISCARD HugePagesPolicy huge_pages_policy()
const noexcept {
return _huge_pages_policy; }
239 #if defined(QUILL_X86ARCH) 240 QUILL_ATTRIBUTE_HOT
void _flush_cachelines(integer_type& last, integer_type offset)
242 integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
243 integer_type
const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
245 if (cur_diff > last_diff)
249 _mm_clflushopt(_storage + (last_diff & _mask));
251 }
while (cur_diff > last_diff);
264 QUILL_NODISCARD
static std::byte* _align_pointer(
void* pointer,
size_t alignment) noexcept
267 "alignment must be a power of two in BoundedSPSCQueue::_align_pointer()");
268 return reinterpret_cast<std::byte*
>((
reinterpret_cast<uintptr_t
>(pointer) + (alignment - 1ul)) &
288 static integer_type _validate_capacity(QUILL_MAYBE_UNUSED integer_type capacity)
290 #if defined(QUILL_X86ARCH) 293 QUILL_THROW(
QuillError{
"Capacity must be at least 1024"});
297 size_t const rounded_capacity =
next_power_of_two(static_cast<size_t>(capacity));
299 constexpr
auto max_integer_capacity = []() constexpr
302 if constexpr (
sizeof(integer_type) >=
sizeof(
size_t))
306 else if constexpr (
sizeof(integer_type) <=
sizeof(uint8_t))
308 return static_cast<size_t>(UINT8_MAX);
310 else if constexpr (
sizeof(integer_type) <=
sizeof(uint16_t))
312 return static_cast<size_t>(UINT16_MAX);
314 else if constexpr (
sizeof(integer_type) <=
sizeof(uint32_t))
316 return static_cast<size_t>(UINT32_MAX);
320 return static_cast<size_t>(UINT64_MAX);
324 if (QUILL_UNLIKELY(rounded_capacity > max_integer_capacity))
326 QUILL_THROW(
QuillError{
"BoundedSPSCQueue capacity exceeds the queue position type"});
332 constexpr
size_t metadata_size{2u *
sizeof(size_t)};
333 constexpr
size_t max_rounded_capacity = (SIZE_MAX - metadata_size - QUILL_CACHE_LINE_ALIGNED) / 2u;
335 if (QUILL_UNLIKELY(rounded_capacity > max_rounded_capacity))
337 QUILL_THROW(
QuillError{
"BoundedSPSCQueue capacity is too large"});
340 return static_cast<integer_type
>(rounded_capacity);
343 QUILL_NODISCARD
static void* _alloc_aligned(
size_t size,
size_t alignment,
344 QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
347 void* p = _aligned_malloc(size, alignment);
351 QUILL_THROW(
QuillError{std::string{
"_alloc_aligned failed with error message errno: "} +
352 std::to_string(errno)});
358 constexpr
size_t metadata_size{2u *
sizeof(size_t)};
359 size_t const total_size{size + metadata_size + alignment};
362 int flags = MAP_PRIVATE | MAP_ANONYMOUS;
364 #if defined(__linux__) 365 if (huge_pages_policy != HugePagesPolicy::Never)
367 flags |= MAP_HUGETLB;
371 void* mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
373 #if defined(__linux__) 374 if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
377 flags &= ~MAP_HUGETLB;
378 mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
382 if (mem == MAP_FAILED)
384 QUILL_THROW(
QuillError{std::string{
"mmap failed. errno: "} + std::to_string(errno) +
385 " error: " + std::strerror(errno)});
389 std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
392 auto const offset =
static_cast<size_t>(aligned_address -
static_cast<std::byte*
>(mem));
395 std::memcpy(aligned_address -
sizeof(
size_t), &total_size,
sizeof(total_size));
396 std::memcpy(aligned_address - (2u *
sizeof(
size_t)), &offset,
sizeof(offset));
398 return aligned_address;
406 void static _free_aligned(
void* ptr) noexcept
413 std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u *
sizeof(
size_t)),
sizeof(offset));
416 std::memcpy(&total_size, static_cast<std::byte*>(ptr) -
sizeof(
size_t),
sizeof(total_size));
419 void* mem =
static_cast<std::byte*
>(ptr) - offset;
421 ::munmap(mem, total_size);
428 integer_type
const _capacity;
429 integer_type
const _mask;
430 integer_type
const _bytes_per_batch;
431 std::byte*
const _storage{
nullptr};
432 HugePagesPolicy
const _huge_pages_policy;
434 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
435 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
436 integer_type _reader_pos_cache_plus_capacity{0};
437 integer_type _last_flushed_writer_pos{0};
439 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
440 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
441 mutable integer_type _writer_pos_cache{0};
442 integer_type _last_flushed_reader_pos{0};
447 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:54
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:165
QUILL_NODISCARD size_t next_power_of_two(size_t n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:35
Definition: BoundedSPSCQueue.h:62
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:215
constexpr size_t QUILL_CACHE_LINE_SIZE
Cache line constants.
Definition: Common.h:67
custom exception
Definition: QuillError.h:47
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25