9 #include "quill/core/Attributes.h" 10 #include "quill/core/BoundedSPSCQueue.h" 11 #include "quill/core/Common.h" 12 #include "quill/core/InlinedVector.h" 13 #include "quill/core/Spinlock.h" 14 #include "quill/core/UnboundedSPSCQueue.h" 22 #include <string_view> 23 #include <type_traits> 31 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) 33 #pragma warning(disable : 4324) 37 class TransitEventBuffer;
39 class BackendMdcState;
41 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 42 #pragma GCC diagnostic push 43 #pragma GCC diagnostic ignored "-Wredundant-decls" 47 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern std::string
get_thread_name();
48 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern uint32_t
get_thread_id() noexcept;
50 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 51 #pragma GCC diagnostic pop 68 ThreadContext(QueueType queue_type,
size_t initial_queue_capacity,
69 QUILL_MAYBE_UNUSED
size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
70 : _queue_type(queue_type)
72 if (has_unbounded_queue_type())
74 new (&_spsc_queue_union.unbounded_spsc_queue)
75 UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
77 else if (has_bounded_queue_type())
79 new (&_spsc_queue_union.bounded_spsc_queue)
BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
90 if (has_unbounded_queue_type())
92 _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
94 else if (has_bounded_queue_type())
96 _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
101 template <QueueType queue_type>
102 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
104 QUILL_ASSERT(_queue_type == queue_type,
"ThreadContext queue_type mismatch in get_spsc_queue()");
106 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
108 return _spsc_queue_union.unbounded_spsc_queue;
112 return _spsc_queue_union.bounded_spsc_queue;
117 template <QueueType queue_type>
118 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>
const& get_spsc_queue()
121 QUILL_ASSERT(_queue_type == queue_type,
122 "ThreadContext queue_type mismatch in get_spsc_queue() const");
124 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
126 return _spsc_queue_union.unbounded_spsc_queue;
130 return _spsc_queue_union.bounded_spsc_queue;
135 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
SizeCacheVector& get_conditional_arg_size_cache() noexcept
137 return _conditional_arg_size_cache;
141 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_bounded_queue_type()
const noexcept
143 return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
147 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_unbounded_queue_type()
const noexcept
149 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
153 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_dropping_queue()
const noexcept
155 return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
159 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_blocking_queue()
const noexcept
161 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
165 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion
const& get_spsc_queue_union()
const noexcept
167 return _spsc_queue_union;
171 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
173 return _spsc_queue_union;
177 QUILL_NODISCARD std::string_view thread_id()
const noexcept {
return _thread_id; }
180 QUILL_NODISCARD std::string_view thread_name()
const noexcept {
return _thread_name; }
183 void mark_invalid() noexcept
187 _valid.store(
false, std::memory_order_release);
191 QUILL_NODISCARD
bool is_valid()
const noexcept {
return _valid.load(std::memory_order_acquire); }
194 void increment_failure_counter() noexcept
196 _failure_counter.fetch_add(1, std::memory_order_relaxed);
200 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
size_t get_and_reset_failure_counter() noexcept
202 if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
206 return _failure_counter.exchange(0, std::memory_order_relaxed);
212 SpscQueueUnion _spsc_queue_union;
216 std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
217 std::shared_ptr<BackendMdcState> _backend_mdc_state;
218 QueueType _queue_type;
219 std::atomic<bool> _valid{
true};
220 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
238 template <
typename TCallback>
239 void for_each_thread_context(TCallback cb)
243 for (
auto const& elem : _thread_contexts)
250 void register_thread_context(std::shared_ptr<ThreadContext>
const& thread_context)
253 _thread_contexts.push_back(thread_context);
254 _new_thread_context_flag.store(
true, std::memory_order_release);
258 void add_invalid_thread_context() noexcept
260 _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
264 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_invalid_thread_context()
const noexcept
269 return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
273 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool new_thread_context_flag() noexcept
276 if (_new_thread_context_flag.load(std::memory_order_relaxed))
281 _new_thread_context_flag.store(
false, std::memory_order_relaxed);
288 void remove_shared_invalidated_thread_context(
ThreadContext const* thread_context)
295 auto thread_context_it = _thread_contexts.end();
296 for (
auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
298 if (it->get() == thread_context)
300 thread_context_it = it;
305 QUILL_ASSERT(thread_context_it != _thread_contexts.end(),
306 "Attempting to remove a non-existent thread context in " 307 "ThreadContextManager::unregister_thread_context()");
309 QUILL_ASSERT(!thread_context_it->get()->is_valid(),
310 "Attempting to remove a valid thread context in " 311 "ThreadContextManager::unregister_thread_context()");
313 QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
314 "Invalid queue type in ThreadContextManager::unregister_thread_context()");
316 if (thread_context->has_unbounded_queue_type())
318 QUILL_ASSERT(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty(),
319 "Attempting to remove a thread context with a non-empty unbounded queue in " 320 "ThreadContextManager::unregister_thread_context()");
322 else if (thread_context->has_bounded_queue_type())
324 QUILL_ASSERT(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty(),
325 "Attempting to remove a thread context with a non-empty bounded queue in " 326 "ThreadContextManager::unregister_thread_context()");
329 _thread_contexts.erase(thread_context_it);
332 QUILL_ASSERT(_invalid_thread_context_count.load(std::memory_order_relaxed) != 0,
333 "_invalid_thread_context_count underflow in " 334 "ThreadContextManager::unregister_thread_context()");
335 _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
343 std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
345 std::atomic<bool> _new_thread_context_flag{
false};
346 std::atomic<uint32_t> _invalid_thread_context_count{0};
354 size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
355 : _thread_context(std::make_shared<ThreadContext>(
356 queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
358 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG) 364 thread_local
bool thread_local_instance_created =
false;
366 QUILL_ASSERT(!thread_local_instance_created,
367 R
"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)"); 369 thread_local_instance_created = true;
372 ThreadContextManager::instance().register_thread_context(_thread_context);
388 _thread_context->mark_invalid();
391 ThreadContextManager::instance().add_invalid_thread_context();
395 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_thread_context()
const noexcept
397 QUILL_ASSERT(_thread_context,
398 "_thread_context cannot be null in ScopedThreadContext::get_thread_context()");
399 return _thread_context.get();
407 std::shared_ptr<ThreadContext> _thread_context;
415 QueueType queue_type,
size_t initial_queue_capacity,
size_t unbounded_queue_max_capacity,
416 HugePagesPolicy huge_pages_policy)
419 queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
421 return scoped_thread_context.get_thread_context();
425 template <
typename TFrontendOptions>
426 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_local_thread_context()
429 TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
430 TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy);
433 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:43
Definition: ThreadContextManager.h:349
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT ThreadContext * get_scoped_thread_context_impl(QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
Non-template implementation that owns the thread local context.
Definition: ThreadContextManager.h:414
Definition: ThreadContextManager.h:223
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:150
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:80
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:213
Definition: ThreadContextManager.h:54