quill
UnboundedSPSCQueue.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/QuillError.h"
13 
14 #include <atomic>
15 #include <cstddef>
16 #include <cstdint>
17 #include <string>
18 
19 QUILL_BEGIN_NAMESPACE
20 
21 namespace detail
22 {
23 
24 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
25  #pragma warning(push)
26  #pragma warning(disable : 4324)
27 #endif
28 
44 {
45 private:
49  struct Node
50  {
56  explicit Node(size_t bounded_queue_capacity, HugePagesPolicy huge_pages_policy)
57  : bounded_queue(bounded_queue_capacity, huge_pages_policy)
58  {
59  }
60 
62  std::atomic<Node*> next{nullptr};
63  BoundedSPSCQueue bounded_queue;
64  };
65 
66 public:
68  {
69  std::byte* write_buffer{nullptr};
70  size_t writer_pos{0};
71  BoundedSPSCQueue* bounded_queue{nullptr};
72  };
73 
74  struct ReadResult
75  {
76  explicit ReadResult(std::byte* read_position) : read_pos(read_position) {}
77 
78  std::byte* read_pos;
79  size_t previous_capacity{0};
80  size_t new_capacity{0};
81  bool allocation{false};
82  };
83 
87  UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity,
88  HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never)
89  : _max_capacity(max_capacity),
90  _producer(new Node(initial_bounded_queue_capacity, huge_pages_policy)),
91  _consumer(_producer)
92  {
93  }
94 
98  UnboundedSPSCQueue(UnboundedSPSCQueue const&) = delete;
99  UnboundedSPSCQueue& operator=(UnboundedSPSCQueue const&) = delete;
100 
105  {
106  // Get the current consumer node
107  Node const* current_node = _consumer;
108 
109  // Look for extra nodes to delete
110  while (current_node != nullptr)
111  {
112  auto const to_delete = current_node;
113  current_node = current_node->next;
114  delete to_delete;
115  }
116  }
117 
123  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes)
124  {
125  // Try to reserve the bounded queue
126  std::byte* write_pos = _producer->bounded_queue.prepare_write(nbytes);
127 
128  if (QUILL_LIKELY(write_pos != nullptr))
129  {
130  return write_pos;
131  }
132 
133  return _handle_full_queue(nbytes);
134  }
135 
136  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT WriteReservation prepare_write_reserve_cached(size_t nbytes) noexcept
137  {
138  BoundedSPSCQueue* const bounded_queue = &_producer->bounded_queue;
139  auto const reservation = bounded_queue->prepare_write_reserve_cached(nbytes);
140  return WriteReservation{reservation.write_buffer, reservation.writer_pos, bounded_queue};
141  }
142 
147  QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
148  {
149  _producer->bounded_queue.finish_write(nbytes);
150  }
151 
155  QUILL_ATTRIBUTE_HOT void commit_write() noexcept { _producer->bounded_queue.commit_write(); }
156 
160  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
161  {
162  finish_write(nbytes);
163  commit_write();
164  }
165 
166  QUILL_ATTRIBUTE_HOT void finish_and_commit_write_reservation(size_t new_writer_pos) noexcept
167  {
168  _producer->bounded_queue.finish_and_commit_write_reservation(new_writer_pos);
169  }
170 
176  QUILL_NODISCARD size_t producer_capacity() const noexcept
177  {
178  return _producer->bounded_queue.capacity();
179  }
180 
186  void shrink(size_t capacity)
187  {
188  if (capacity > (_producer->bounded_queue.capacity() >> 1))
189  {
190  // We should only shrink if the new capacity is less or at least equal to the previous_power_of_2
191  return;
192  }
193 
194  // We want to shrink the queue, we will create a new queue with a smaller size
195  // the consumer will switch to the newer queue after emptying and deallocating the older queue
196  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
197 
198  // store the new node pointer as next in the current node
199  _producer->next.store(next_node, std::memory_order_release);
200 
201  // producer is now using the next node
202  _producer = next_node;
203  }
204 
209  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
210  {
211  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
212 
213  if (read_result.read_pos != nullptr)
214  {
215  return read_result;
216  }
217 
218  // the buffer is empty check if another buffer exists
219  Node* const next_node = _consumer->next.load(std::memory_order_acquire);
220 
221  if (next_node)
222  {
223  return _read_next_queue(next_node);
224  }
225 
226  // Queue is empty and no new queue exists
227  return read_result;
228  }
229 
234  QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
235  {
236  _consumer->bounded_queue.finish_read(nbytes);
237  }
238 
242  QUILL_ATTRIBUTE_HOT void commit_read() noexcept { _consumer->bounded_queue.commit_read(); }
243 
249  QUILL_NODISCARD size_t capacity() const noexcept { return _consumer->bounded_queue.capacity(); }
250 
256  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
257  {
258  return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) == nullptr);
259  }
260 
261 private:
262  /***/
263  QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes)
264  {
265  // Then it means the queue doesn't have enough size
266  size_t capacity = _producer->bounded_queue.capacity();
267 
268  capacity = (capacity > ((SIZE_MAX / 2ull))) ? _max_capacity : capacity * 2ull;
269 
270  while (capacity < nbytes)
271  {
272  if (QUILL_UNLIKELY(capacity > ((SIZE_MAX / 2ull))))
273  {
274  capacity = _max_capacity;
275  break;
276  }
277 
278  capacity *= 2ull;
279  }
280 
281  if (QUILL_UNLIKELY(capacity > _max_capacity))
282  {
283  if (nbytes > _max_capacity)
284  {
285  QUILL_THROW(
286  QuillError{"Logging single messages larger than the configured maximum queue capacity "
287  "is not possible.\n"
288  "To log single messages exceeding this limit, consider increasing "
289  "FrontendOptions::unbounded_queue_max_capacity.\n"
290  "Message size: " +
291  std::to_string(nbytes) +
292  " bytes\n"
293  "Required queue capacity: " +
294  std::to_string(capacity) +
295  " bytes\n"
296  "Configured maximum queue capacity: " +
297  std::to_string(_max_capacity) + " bytes"});
298  }
299 
300  // we reached the unbounded_queue_max_capacity we won't be allocating more
301  // instead return nullptr to block or drop
302  return nullptr;
303  }
304 
305  // commit previous write to the old queue before switching
306  _producer->bounded_queue.commit_write();
307 
308  // We failed to reserve because the queue was full, create a new node with a new queue
309  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
310 
311  // store the new node pointer as next in the current node
312  _producer->next.store(next_node, std::memory_order_release);
313 
314  // producer is now using the next node
315  _producer = next_node;
316 
317  // reserve again, this time we know we will always succeed, cast to void* to ignore
318  std::byte* const write_pos = _producer->bounded_queue.prepare_write(nbytes);
319 
320  QUILL_ASSERT(
321  write_pos,
322  "write_pos is nullptr after allocating new node in UnboundedSPSCQueue::prepare_write()");
323 
324  return write_pos;
325  }
326 
327  /***/
328  QUILL_NODISCARD ReadResult _read_next_queue(Node* next_node)
329  {
330  // a new buffer was added by the producer, this happens only when we have allocated a new queue
331 
332  // try the existing buffer once more
333  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
334 
335  if (read_result.read_pos)
336  {
337  return read_result;
338  }
339 
340  // Switch to the new buffer for reading
341  // commit the previous reads before deleting the queue
342  _consumer->bounded_queue.commit_read();
343 
344  // switch to the new buffer, existing one is deleted
345  auto const previous_capacity = _consumer->bounded_queue.capacity();
346  delete _consumer;
347 
348  _consumer = next_node;
349  read_result.read_pos = _consumer->bounded_queue.prepare_read();
350 
351  // we switched to a new here, so we store the capacity info to return it
352  read_result.allocation = true;
353  read_result.new_capacity = _consumer->bounded_queue.capacity();
354  read_result.previous_capacity = previous_capacity;
355 
356  return read_result;
357  }
358 
359 private:
360  size_t _max_capacity;
361 
363  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _producer{nullptr};
364  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _consumer{nullptr};
365 };
366 
367 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
368  #pragma warning(pop)
369 #endif
370 
371 } // namespace detail
372 
373 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:43
~UnboundedSPSCQueue()
Destructor.
Definition: UnboundedSPSCQueue.h:104
Definition: UnboundedSPSCQueue.h:74
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte * prepare_write(size_t nbytes)
Reserve contiguous space for the producer without making it visible to the consumer.
Definition: UnboundedSPSCQueue.h:123
QUILL_NODISCARD size_t capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:249
UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity, HugePagesPolicy huge_pages_policy=quill::HugePagesPolicy::Never)
Constructor.
Definition: UnboundedSPSCQueue.h:87
QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
Complement to reserve producer space that makes nbytes starting from the return of reserve producer s...
Definition: UnboundedSPSCQueue.h:147
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
checks if the queue is empty
Definition: UnboundedSPSCQueue.h:256
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
Consumes the next nbytes in the buffer and frees it back for the producer to reuse.
Definition: UnboundedSPSCQueue.h:234
QUILL_NODISCARD size_t producer_capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:176
Definition: UnboundedSPSCQueue.h:67
QUILL_ATTRIBUTE_HOT void commit_write() noexcept
Commit write to notify the consumer bytes are ready to read.
Definition: UnboundedSPSCQueue.h:155
custom exception
Definition: QuillError.h:47
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer.
Definition: UnboundedSPSCQueue.h:209
QUILL_ATTRIBUTE_HOT void commit_read() noexcept
Commit the read to indicate that the bytes are read and are now free to be reused.
Definition: UnboundedSPSCQueue.h:242
void shrink(size_t capacity)
Shrinks the queue if capacity is a valid smaller power of 2.
Definition: UnboundedSPSCQueue.h:186
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
Finish and commit write as a single function.
Definition: UnboundedSPSCQueue.h:160