quill
ThreadContextManager.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/InlinedVector.h"
13 #include "quill/core/Spinlock.h"
14 #include "quill/core/UnboundedSPSCQueue.h"
15 
16 #include <atomic>
17 #include <cstdint>
18 #include <cstdlib>
19 #include <memory>
20 #include <new>
21 #include <string>
22 #include <string_view>
23 #include <type_traits>
24 #include <vector>
25 
26 QUILL_BEGIN_NAMESPACE
27 
28 namespace detail
29 {
30 
31 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
32  #pragma warning(push)
33  #pragma warning(disable : 4324)
34 #endif
35 
37 class TransitEventBuffer;
38 class BackendWorker;
39 class BackendMdcState;
40 
41 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
42  #pragma GCC diagnostic push
43  #pragma GCC diagnostic ignored "-Wredundant-decls"
44 #endif
45 
47 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern std::string get_thread_name();
48 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern uint32_t get_thread_id() noexcept;
49 
50 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
51  #pragma GCC diagnostic pop
52 #endif
53 
55 {
56 private:
57  union SpscQueueUnion
58  {
59  UnboundedSPSCQueue unbounded_spsc_queue;
60  BoundedSPSCQueue bounded_spsc_queue;
61 
62  SpscQueueUnion() {}
63  ~SpscQueueUnion() {}
64  };
65 
66 public:
67  /***/
68  ThreadContext(QueueType queue_type, size_t initial_queue_capacity,
69  QUILL_MAYBE_UNUSED size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
70  : _queue_type(queue_type)
71  {
72  if (has_unbounded_queue_type())
73  {
74  new (&_spsc_queue_union.unbounded_spsc_queue)
75  UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
76  }
77  else if (has_bounded_queue_type())
78  {
79  new (&_spsc_queue_union.bounded_spsc_queue) BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
80  }
81  }
82 
83  /***/
84  ThreadContext(ThreadContext const&) = delete;
85  ThreadContext& operator=(ThreadContext const&) = delete;
86 
87  /***/
88  ~ThreadContext()
89  {
90  if (has_unbounded_queue_type())
91  {
92  _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
93  }
94  else if (has_bounded_queue_type())
95  {
96  _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
97  }
98  }
99 
100  /***/
101  template <QueueType queue_type>
102  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
103  {
104  QUILL_ASSERT(_queue_type == queue_type, "ThreadContext queue_type mismatch in get_spsc_queue()");
105 
106  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
107  {
108  return _spsc_queue_union.unbounded_spsc_queue;
109  }
110  else
111  {
112  return _spsc_queue_union.bounded_spsc_queue;
113  }
114  }
115 
116  /***/
117  template <QueueType queue_type>
118  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue> const& get_spsc_queue()
119  const noexcept
120  {
121  QUILL_ASSERT(_queue_type == queue_type,
122  "ThreadContext queue_type mismatch in get_spsc_queue() const");
123 
124  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
125  {
126  return _spsc_queue_union.unbounded_spsc_queue;
127  }
128  else
129  {
130  return _spsc_queue_union.bounded_spsc_queue;
131  }
132  }
133 
134  /***/
135  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SizeCacheVector& get_conditional_arg_size_cache() noexcept
136  {
137  return _conditional_arg_size_cache;
138  }
139 
140  /***/
141  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_bounded_queue_type() const noexcept
142  {
143  return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
144  }
145 
146  /***/
147  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_unbounded_queue_type() const noexcept
148  {
149  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
150  }
151 
152  /***/
153  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_dropping_queue() const noexcept
154  {
155  return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
156  }
157 
158  /***/
159  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_blocking_queue() const noexcept
160  {
161  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
162  }
163 
164  /***/
165  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion const& get_spsc_queue_union() const noexcept
166  {
167  return _spsc_queue_union;
168  }
169 
170  /***/
171  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
172  {
173  return _spsc_queue_union;
174  }
175 
176  /***/
177  QUILL_NODISCARD std::string_view thread_id() const noexcept { return _thread_id; }
178 
179  /***/
180  QUILL_NODISCARD std::string_view thread_name() const noexcept { return _thread_name; }
181 
182  /***/
183  void mark_invalid() noexcept
184  {
185  // Publish thread teardown after any final queue updates so the backend can safely
186  // treat `!is_valid()` as the start of the final drain-and-remove sequence.
187  _valid.store(false, std::memory_order_release);
188  }
189 
190  /***/
191  QUILL_NODISCARD bool is_valid() const noexcept { return _valid.load(std::memory_order_acquire); }
192 
193  /***/
194  void increment_failure_counter() noexcept
195  {
196  _failure_counter.fetch_add(1, std::memory_order_relaxed);
197  }
198 
199  /***/
200  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_failure_counter() noexcept
201  {
202  if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
203  {
204  return 0;
205  }
206  return _failure_counter.exchange(0, std::memory_order_relaxed);
207  }
208 
209 private:
210  friend class detail::BackendWorker;
211 
212  SpscQueueUnion _spsc_queue_union;
213  SizeCacheVector _conditional_arg_size_cache;
214  std::string _thread_id = std::to_string(get_thread_id());
215  std::string _thread_name = get_thread_name();
216  std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
217  std::shared_ptr<BackendMdcState> _backend_mdc_state;
218  QueueType _queue_type;
219  std::atomic<bool> _valid{true};
220  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
221 };
222 
224 {
225 public:
226  /***/
227  QUILL_EXPORT static ThreadContextManager& instance() noexcept
228  {
229  static ThreadContextManager instance;
230  return instance;
231  }
232 
233  /***/
235  ThreadContextManager& operator=(ThreadContextManager const&) = delete;
236 
237  /***/
238  template <typename TCallback>
239  void for_each_thread_context(TCallback cb)
240  {
241  LockGuard const lock{_spinlock};
242 
243  for (auto const& elem : _thread_contexts)
244  {
245  cb(elem.get());
246  }
247  }
248 
249  /***/
250  void register_thread_context(std::shared_ptr<ThreadContext> const& thread_context)
251  {
252  LockGuard const lock{_spinlock};
253  _thread_contexts.push_back(thread_context);
254  _new_thread_context_flag.store(true, std::memory_order_release);
255  }
256 
257  /***/
258  void add_invalid_thread_context() noexcept
259  {
260  _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
261  }
262 
263  /***/
264  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_invalid_thread_context() const noexcept
265  {
266  // Relaxed is sufficient here: a non-zero count just tells the backend to check
267  // individual ThreadContext validity flags (which use acquire ordering) and drain
268  // their queues. If we miss an update we will recheck on the next iteration
269  return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
270  }
271 
272  /***/
273  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool new_thread_context_flag() noexcept
274  {
275  // Again relaxed memory model as in case it is false we will acquire the lock
276  if (_new_thread_context_flag.load(std::memory_order_relaxed))
277  {
278  // if the variable was updated to true, set it to false,
279  // There should not be any race condition here as this is the only place _changed is set to
280  // false, and we will return true anyway
281  _new_thread_context_flag.store(false, std::memory_order_relaxed);
282  return true;
283  }
284  return false;
285  }
286 
287  /***/
288  void remove_shared_invalidated_thread_context(ThreadContext const* thread_context)
289  {
290  LockGuard const lock{_spinlock};
291 
292  // We could use std::find_if, but since this header is included in Logger.h, which is essential
293  // for logging purposes, we aim to minimize the number of includes in that path.
294  // Therefore, we implement our own find_if loop here.
295  auto thread_context_it = _thread_contexts.end();
296  for (auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
297  {
298  if (it->get() == thread_context)
299  {
300  thread_context_it = it;
301  break;
302  }
303  }
304 
305  QUILL_ASSERT(thread_context_it != _thread_contexts.end(),
306  "Attempting to remove a non-existent thread context in "
307  "ThreadContextManager::unregister_thread_context()");
308 
309  QUILL_ASSERT(!thread_context_it->get()->is_valid(),
310  "Attempting to remove a valid thread context in "
311  "ThreadContextManager::unregister_thread_context()");
312 
313  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
314  "Invalid queue type in ThreadContextManager::unregister_thread_context()");
315 
316  if (thread_context->has_unbounded_queue_type())
317  {
318  QUILL_ASSERT(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty(),
319  "Attempting to remove a thread context with a non-empty unbounded queue in "
320  "ThreadContextManager::unregister_thread_context()");
321  }
322  else if (thread_context->has_bounded_queue_type())
323  {
324  QUILL_ASSERT(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty(),
325  "Attempting to remove a thread context with a non-empty bounded queue in "
326  "ThreadContextManager::unregister_thread_context()");
327  }
328 
329  _thread_contexts.erase(thread_context_it);
330 
331  // Decrement the counter since we found something to
332  QUILL_ASSERT(_invalid_thread_context_count.load(std::memory_order_relaxed) != 0,
333  "_invalid_thread_context_count underflow in "
334  "ThreadContextManager::unregister_thread_context()");
335  _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
336  }
337 
338 private:
339  ThreadContextManager() = default;
340  ~ThreadContextManager() = default;
341 
342 private:
343  std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
344  Spinlock _spinlock;
345  std::atomic<bool> _new_thread_context_flag{false};
346  std::atomic<uint32_t> _invalid_thread_context_count{0};
347 };
348 
350 {
351 public:
352  /***/
353  ScopedThreadContext(QueueType queue_type, size_t initial_queue_capacity,
354  size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
355  : _thread_context(std::make_shared<ThreadContext>(
356  queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
357  {
358 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG)
359  // Thread-local flag to track if an instance has been created for this thread.
360  // This ensures that get_local_thread_context() is not called with different template arguments
361  // when using custom FrontendOptions. Having multiple thread contexts in a single thread is fine
362  // and functional but goes against the design principle of maintaining a single thread context
363  // per thread.
364  thread_local bool thread_local_instance_created = false;
365 
366  QUILL_ASSERT(!thread_local_instance_created,
367  R"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)");
368 
369  thread_local_instance_created = true;
370 #endif
371 
372  ThreadContextManager::instance().register_thread_context(_thread_context);
373  }
374 
375  /***/
376  ScopedThreadContext(ScopedThreadContext const&) = delete;
377  ScopedThreadContext& operator=(ScopedThreadContext const&) = delete;
378 
379  /***/
380  ~ScopedThreadContext() noexcept
381  {
382  // This destructor will get called when the thread that created this wrapper stops
383  // we will only invalidate the thread context
384  // The backend thread will empty an invalidated ThreadContext and then remove_file it from
385  // the ThreadContextCollection
386  // There is only exception for the thread who owns the ThreadContextCollection the
387  // main thread. The thread context of the main thread can get deleted before getting invalidated
388  _thread_context->mark_invalid();
389 
390  // Notify the backend thread that one context has been removed
391  ThreadContextManager::instance().add_invalid_thread_context();
392  }
393 
394  /***/
395  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_thread_context() const noexcept
396  {
397  QUILL_ASSERT(_thread_context,
398  "_thread_context cannot be null in ScopedThreadContext::get_thread_context()");
399  return _thread_context.get();
400  }
401 
402 private:
407  std::shared_ptr<ThreadContext> _thread_context;
408 };
409 
414 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT inline ThreadContext* get_scoped_thread_context_impl(
415  QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity,
416  HugePagesPolicy huge_pages_policy)
417 {
418  thread_local ScopedThreadContext scoped_thread_context{
419  queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
420 
421  return scoped_thread_context.get_thread_context();
422 }
423 
424 /***/
425 template <typename TFrontendOptions>
426 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_local_thread_context()
427 {
429  TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
430  TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy);
431 }
432 
433 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
434  #pragma warning(pop)
435 #endif
436 
437 } // namespace detail
438 
439 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:43
Definition: ThreadContextManager.h:349
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT ThreadContext * get_scoped_thread_context_impl(QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
Non-template implementation that owns the thread local context.
Definition: ThreadContextManager.h:414
Definition: ThreadContextManager.h:223
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:150
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:80
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:213
Definition: ThreadContextManager.h:54