9 #include "quill/core/Attributes.h" 10 #include "quill/core/BoundedSPSCQueue.h" 11 #include "quill/core/Common.h" 12 #include "quill/core/QuillError.h" 24 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) 26 #pragma warning(disable : 4324) 56 explicit Node(
size_t bounded_queue_capacity, HugePagesPolicy huge_pages_policy)
57 : bounded_queue(bounded_queue_capacity, huge_pages_policy)
62 std::atomic<Node*> next{
nullptr};
69 std::byte* write_buffer{
nullptr};
76 explicit ReadResult(std::byte* read_position) : read_pos(read_position) {}
79 size_t previous_capacity{0};
80 size_t new_capacity{0};
81 bool allocation{
false};
88 HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never)
89 : _max_capacity(max_capacity),
90 _producer(new Node(initial_bounded_queue_capacity, huge_pages_policy)),
107 Node
const* current_node = _consumer;
110 while (current_node !=
nullptr)
112 auto const to_delete = current_node;
113 current_node = current_node->next;
126 std::byte* write_pos = _producer->bounded_queue.prepare_write(nbytes);
128 if (QUILL_LIKELY(write_pos !=
nullptr))
133 return _handle_full_queue(nbytes);
136 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
WriteReservation prepare_write_reserve_cached(
size_t nbytes) noexcept
139 auto const reservation = bounded_queue->prepare_write_reserve_cached(nbytes);
140 return WriteReservation{reservation.write_buffer, reservation.writer_pos, bounded_queue};
149 _producer->bounded_queue.finish_write(nbytes);
155 QUILL_ATTRIBUTE_HOT
void commit_write() noexcept { _producer->bounded_queue.commit_write(); }
166 QUILL_ATTRIBUTE_HOT
void finish_and_commit_write_reservation(
size_t new_writer_pos) noexcept
168 _producer->bounded_queue.finish_and_commit_write_reservation(new_writer_pos);
178 return _producer->bounded_queue.capacity();
188 if (capacity > (_producer->bounded_queue.capacity() >> 1))
196 auto const next_node =
new Node{
capacity, _producer->bounded_queue.huge_pages_policy()};
199 _producer->next.store(next_node, std::memory_order_release);
202 _producer = next_node;
211 ReadResult read_result{_consumer->bounded_queue.prepare_read()};
213 if (read_result.read_pos !=
nullptr)
219 Node*
const next_node = _consumer->next.load(std::memory_order_acquire);
223 return _read_next_queue(next_node);
236 _consumer->bounded_queue.finish_read(nbytes);
242 QUILL_ATTRIBUTE_HOT
void commit_read() noexcept { _consumer->bounded_queue.commit_read(); }
249 QUILL_NODISCARD
size_t capacity() const noexcept {
return _consumer->bounded_queue.capacity(); }
256 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool empty() const noexcept
258 return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) ==
nullptr);
263 QUILL_NODISCARD std::byte* _handle_full_queue(
size_t nbytes)
266 size_t capacity = _producer->bounded_queue.capacity();
268 capacity = (capacity > ((SIZE_MAX / 2ull))) ? _max_capacity : capacity * 2ull;
270 while (capacity < nbytes)
272 if (QUILL_UNLIKELY(capacity > ((SIZE_MAX / 2ull))))
274 capacity = _max_capacity;
281 if (QUILL_UNLIKELY(capacity > _max_capacity))
283 if (nbytes > _max_capacity)
286 QuillError{
"Logging single messages larger than the configured maximum queue capacity " 288 "To log single messages exceeding this limit, consider increasing " 289 "FrontendOptions::unbounded_queue_max_capacity.\n" 291 std::to_string(nbytes) +
293 "Required queue capacity: " +
294 std::to_string(capacity) +
296 "Configured maximum queue capacity: " +
297 std::to_string(_max_capacity) +
" bytes"});
306 _producer->bounded_queue.commit_write();
309 auto const next_node =
new Node{
capacity, _producer->bounded_queue.huge_pages_policy()};
312 _producer->next.store(next_node, std::memory_order_release);
315 _producer = next_node;
318 std::byte*
const write_pos = _producer->bounded_queue.prepare_write(nbytes);
322 "write_pos is nullptr after allocating new node in UnboundedSPSCQueue::prepare_write()");
328 QUILL_NODISCARD
ReadResult _read_next_queue(Node* next_node)
333 ReadResult read_result{_consumer->bounded_queue.prepare_read()};
335 if (read_result.read_pos)
342 _consumer->bounded_queue.commit_read();
345 auto const previous_capacity = _consumer->bounded_queue.capacity();
348 _consumer = next_node;
349 read_result.read_pos = _consumer->bounded_queue.prepare_read();
352 read_result.allocation =
true;
353 read_result.new_capacity = _consumer->bounded_queue.capacity();
354 read_result.previous_capacity = previous_capacity;
360 size_t _max_capacity;
363 alignas(QUILL_CACHE_LINE_ALIGNED) Node* _producer{
nullptr};
364 alignas(QUILL_CACHE_LINE_ALIGNED) Node* _consumer{
nullptr};
367 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:43
~UnboundedSPSCQueue()
Destructor.
Definition: UnboundedSPSCQueue.h:104
Definition: UnboundedSPSCQueue.h:74
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte * prepare_write(size_t nbytes)
Reserve contiguous space for the producer without making it visible to the consumer.
Definition: UnboundedSPSCQueue.h:123
QUILL_NODISCARD size_t capacity() const noexcept
Return the current buffer's capacity.
Definition: UnboundedSPSCQueue.h:249
UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity, HugePagesPolicy huge_pages_policy=quill::HugePagesPolicy::Never)
Constructor.
Definition: UnboundedSPSCQueue.h:87
QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
Complement to reserve producer space that makes nbytes starting from the return of reserve producer s...
Definition: UnboundedSPSCQueue.h:147
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
checks if the queue is empty
Definition: UnboundedSPSCQueue.h:256
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
Consumes the next nbytes in the buffer and frees it back for the producer to reuse.
Definition: UnboundedSPSCQueue.h:234
QUILL_NODISCARD size_t producer_capacity() const noexcept
Return the current buffer's capacity.
Definition: UnboundedSPSCQueue.h:176
Definition: UnboundedSPSCQueue.h:67
QUILL_ATTRIBUTE_HOT void commit_write() noexcept
Commit write to notify the consumer bytes are ready to read.
Definition: UnboundedSPSCQueue.h:155
custom exception
Definition: QuillError.h:47
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer.
Definition: UnboundedSPSCQueue.h:209
QUILL_ATTRIBUTE_HOT void commit_read() noexcept
Commit the read to indicate that the bytes are read and are now free to be reused.
Definition: UnboundedSPSCQueue.h:242
void shrink(size_t capacity)
Shrinks the queue if capacity is a valid smaller power of 2.
Definition: UnboundedSPSCQueue.h:186
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
Finish and commit write as a single function.
Definition: UnboundedSPSCQueue.h:160