quill
BackendWorker.h
1 
7 #pragma once
8 
9 #if defined(_WIN32)
10  #include "quill/backend/Utf8Conv.h"
11 #endif
12 
13 #include "quill/backend/BackendMdcState.h"
14 #include "quill/backend/BackendOptions.h"
15 #include "quill/backend/BackendUtilities.h"
16 #include "quill/backend/BackendWorkerLock.h"
17 #include "quill/backend/BacktraceStorage.h"
18 #include "quill/backend/PatternFormatter.h"
19 #include "quill/backend/RdtscClock.h"
20 #include "quill/backend/ThreadUtilities.h"
21 #include "quill/backend/TransitEvent.h"
22 #include "quill/backend/TransitEventBuffer.h"
23 
24 #include "quill/core/Attributes.h"
25 #include "quill/core/BoundedSPSCQueue.h"
26 #include "quill/core/ChronoTimeUtils.h"
27 #include "quill/core/Codec.h"
28 #include "quill/core/Common.h"
29 #include "quill/core/DynamicFormatArgStore.h"
30 #include "quill/core/LogLevel.h"
31 #include "quill/core/LoggerBase.h"
32 #include "quill/core/LoggerManager.h"
33 #include "quill/core/MacroMetadata.h"
34 #include "quill/core/MathUtilities.h"
35 #include "quill/core/Metric.h"
36 #include "quill/core/QuillError.h"
37 #include "quill/core/SinkManager.h"
38 #include "quill/core/ThreadContextManager.h"
39 #include "quill/core/ThreadPrimitives.h"
40 #include "quill/core/TimeUtilities.h"
41 #include "quill/core/UnboundedSPSCQueue.h"
42 #include "quill/sinks/Sink.h"
43 
44 #include "quill/bundled/fmt/base.h"
45 
46 #include <algorithm>
47 #include <atomic>
48 #include <chrono>
49 #include <condition_variable>
50 #include <cstddef>
51 #include <cstdint>
52 #include <cstring>
53 #include <ctime>
54 #include <exception>
55 #include <functional>
56 #include <iterator>
57 #include <limits>
58 #include <memory>
59 #include <mutex>
60 #include <optional>
61 #include <string>
62 #include <string_view>
63 #include <thread>
64 #include <unordered_map>
65 #include <utility>
66 #include <vector>
67 
68 QUILL_BEGIN_NAMESPACE
69 
70 class ManualBackendWorker; // Forward declaration
71 
72 namespace detail
73 {
74 
75 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
76  #pragma warning(push)
77  #pragma warning(disable : 4324)
78 #endif
79 
81 {
82 public:
86  BackendWorker() = default;
87 
91  BackendWorker(BackendWorker const&) = delete;
92  BackendWorker& operator=(BackendWorker const&) = delete;
93 
98  {
99  // This destructor will run during static destruction as the thread is part of the singleton
100  stop();
101 
102  RdtscClock const* rdtsc_clock = _rdtsc_clock.exchange(nullptr);
103  delete rdtsc_clock;
104  }
105 
106  /***/
107  QUILL_NODISCARD bool is_running() const noexcept
108  {
109  return _is_worker_running.load(std::memory_order_acquire);
110  }
111 
115  QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
116  {
117  if (QUILL_UNLIKELY(_options.sleep_duration > _options.rdtsc_resync_interval))
118  {
119  QUILL_THROW(
120  QuillError{"Invalid config, When TSC clock is used backend_thread_sleep_duration should "
121  "not be higher than rdtsc_resync_interval"});
122  }
123 
124  RdtscClock const* rdtsc_clock = _rdtsc_clock.load(std::memory_order_acquire);
125  return rdtsc_clock ? rdtsc_clock->time_since_epoch_safe(rdtsc_value) : 0;
126  }
127 
132  QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
133  {
134  return _worker_thread_id.load();
135  }
136 
141  QUILL_ATTRIBUTE_COLD void run(BackendOptions const& options)
142  {
143  _ensure_linker_retains_symbols();
144 
145  _has_worker_thread_exited.store(false);
146 
147  _process_id = std::to_string(get_process_id());
148 
149  // Validate eagerly so Backend::start() can fail on the caller thread instead of surfacing
150  // the error later from the backend poll loop.
151  (void)BackendMdcState{options.mdc_format_pattern};
152 
154  {
155  _backend_worker_lock = std::make_unique<BackendWorkerLock>(_process_id);
156  }
157 
158  std::thread worker(
159  [this, options]()
160  {
161  QUILL_TRY { _init(options); }
162 #if !defined(QUILL_NO_EXCEPTIONS)
163  QUILL_CATCH(std::exception const& e)
164  {
165  _notify_error(options.error_notifier, e.what());
166  std::terminate();
167  }
168  QUILL_CATCH_ALL()
169  {
170  _notify_error(options.error_notifier,
171  std::string{"Caught unhandled exception during backend initialization."});
172  std::terminate();
173  }
174 #endif
175 
176  QUILL_TRY
177  {
178  if (!_options.cpu_affinity.empty())
179  {
180  // Set cpu affinity if requested
181  set_cpu_affinity(_options.cpu_affinity);
182  }
183  }
184 #if !defined(QUILL_NO_EXCEPTIONS)
185  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
186  QUILL_CATCH_ALL()
187  {
188  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
189  }
190 #endif
191 
192  QUILL_TRY
193  {
194  // Set the thread name to the desired name
195  set_thread_name(_options.thread_name.data());
196  }
197 #if !defined(QUILL_NO_EXCEPTIONS)
198  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
199  QUILL_CATCH_ALL()
200  {
201  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
202  }
203 #endif
204 
205  // All okay, set the backend worker thread running flag
206  _is_worker_running.store(true);
207 
208  // Running
209  while (QUILL_LIKELY(_is_worker_running.load(std::memory_order_relaxed)))
210  {
211  // main loop
212  QUILL_TRY { _poll(); }
213 #if !defined(QUILL_NO_EXCEPTIONS)
214  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
215  QUILL_CATCH_ALL()
216  {
217  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
218  }
219 #endif
220  }
221 
222  // exit
223  QUILL_TRY { _exit(); }
224 #if !defined(QUILL_NO_EXCEPTIONS)
225  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
226  QUILL_CATCH_ALL()
227  {
228  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
229  }
230 #endif
231 
232  _has_worker_thread_exited.store(true);
233  });
234 
235  // Move the worker ownership to our class
236  _worker_thread.swap(worker);
237 
238  while (!_is_worker_running.load(std::memory_order_seq_cst))
239  {
240  // wait for the thread to start
241  detail::sleep_for_ns(100'000ull); // 100 us
242  }
243  }
244 
248  QUILL_ATTRIBUTE_COLD void stop() noexcept
249  {
250  // Stop the backend worker
251  if (!_is_worker_running.exchange(false))
252  {
253  // already stopped
254  return;
255  }
256 
257  // signal wake up the backend worker thread
258  notify();
259 
260  // Wait the backend thread to join, if backend thread was never started it won't be joinable
261  if (_worker_thread.joinable())
262  {
263  _worker_thread.join();
264  }
265 
266  if (!_has_worker_thread_exited.load())
267  {
268  // The backend thread was terminated externally without completing its exit sequence.
269  // This can happen on Windows when Quill is used inside a DLL
270  // Drain any remaining log messages on this thread as a best-effort fallback.
271  QUILL_TRY { _exit(); }
272 #if !defined(QUILL_NO_EXCEPTIONS)
273  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
274  QUILL_CATCH_ALL() { _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."}); }
275 #endif
276  }
277 
278  _worker_thread_id.store(0);
279  _backend_worker_lock.reset(nullptr);
280 
281  // Release scratch state owned by the worker so a subsequent start() begins from a clean
282  // slate and we don't hold on to memory grown during the previous run. The frontend queues,
283  // sinks, and loggers themselves are owned by the singleton managers and intentionally
284  // survive restarts; only the worker's own scratch state is freed here. The worker thread
285  // has already joined above so these members are not concurrently accessed. Reassigning
286  // from a fresh empty instance both clears the contents and releases any reserved capacity.
287  _named_args_templates = {};
288  _logger_removal_flags = {};
289  _removed_loggers = {};
290  _active_thread_contexts_cache = {};
291  _active_sinks_cache = {};
292  _named_args_format_template = {};
293  _format_args_store = DynamicFormatArgStore{};
294  }
295 
300  void notify()
301  {
302 #ifdef __MINGW32__
303  // MinGW can deadlock if the mutex is released before cv.notify_one(),
304  // so keep notify_one() inside the lock for MinGW
305  std::lock_guard<std::mutex> lock{_wake_up_mutex};
306  _wake_up_flag = true;
307  _wake_up_cv.notify_one();
308 #else
309  // Set the flag to indicate that the data is ready
310  {
311  std::lock_guard<std::mutex> lock{_wake_up_mutex};
312  _wake_up_flag = true;
313  }
314 
315  // Signal the condition variable to wake up the worker thread
316  _wake_up_cv.notify_one();
317 #endif
318  }
319 
320 private:
321  /***/
322  QUILL_ATTRIBUTE_HOT void _invoke_poll_hook(std::function<void()> const& hook) const
323  {
324  QUILL_TRY { hook(); }
325 #if !defined(QUILL_NO_EXCEPTIONS)
326  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
327  QUILL_CATCH_ALL()
328  {
329  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
330  }
331 #endif
332  }
333 
334  /***/
335  QUILL_ATTRIBUTE_HOT void _invoke_poll_end_once(bool& poll_end_called) const
336  {
337  if (poll_end_called)
338  {
339  return;
340  }
341 
342  poll_end_called = true;
343  _invoke_poll_hook(_options.backend_worker_on_poll_end);
344  }
345 
346  template <typename TMessage>
347  static void _notify_error(std::function<void(std::string const&)> const& error_notifier, TMessage&& message)
348  {
349  if (static_cast<bool>(error_notifier))
350  {
351  QUILL_TRY { error_notifier(static_cast<TMessage&&>(message)); }
352 #if !defined(QUILL_NO_EXCEPTIONS)
353  QUILL_CATCH_ALL()
354  {
355  // Swallow exceptions from the user-provided error_notifier to prevent
356  // infinite loops when both formatting and the notifier throw.
357  }
358 #endif
359  }
360  }
361 
366  QUILL_ATTRIBUTE_COLD static void _ensure_linker_retains_symbols()
367  {
368  // Calls to ensure it is retained by the linker.
369  QUILL_MAYBE_UNUSED static auto thread_name = get_thread_name();
370  (void)thread_name;
371 
372 #if defined(_WIN32)
373  std::wstring const dummy = L"dummy";
374  QUILL_MAYBE_UNUSED static auto encode1 = utf8_encode(dummy);
375  (void)encode1;
376 
377  QUILL_MAYBE_UNUSED static auto encode2 =
378  utf8_encode(reinterpret_cast<std::byte const*>(dummy.data()), dummy.size());
379  (void)encode2;
380 #endif
381  }
382 
386  QUILL_ATTRIBUTE_HOT void _poll()
387  {
388  // poll hook
389  bool poll_end_called = false;
390  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_begin)))
391  {
392  _invoke_poll_hook(_options.backend_worker_on_poll_begin);
393  }
394 
395  // load all contexts locally
396  _update_active_thread_contexts_cache();
397 
398  // Read all frontend queues and cache the log statements and the metadata as TransitEvents
399  size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
400 
401  if (cached_transit_events_count != 0)
402  {
403  // there are cached events to process
404  if (cached_transit_events_count < _options.transit_events_soft_limit)
405  {
406  // process a single transit event, then give priority to reading the frontend queues again
407  _process_lowest_timestamp_transit_event();
408  }
409  else
410  {
411  // we want to process a batch of events.
412  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
413  _process_lowest_timestamp_transit_event())
414  {
415  // We need to be cautious because there are log messages in the lock-free queues
416  // that have not yet been cached in the transit event buffer. Logging only the cached
417  // messages can result in out-of-order log entries, as messages with lower timestamps
418  // in the queue might be missed.
419  }
420  }
421  }
422  else
423  {
424  // No cached transit events to process, minimal thread workload.
425 
426  // force flush all remaining messages
427  _flush_and_run_active_sinks(true, _options.sink_min_flush_interval);
428 
429  // check for any dropped events / blocked threads
430  _check_failure_counter(_options.error_notifier);
431 
432  // This is useful when BackendTscClock is used to keep it up to date
433  _resync_rdtsc_clock();
434 
435  // Also check if all queues are empty
436  bool const queues_and_events_empty = _check_frontend_queues_and_cached_transit_events_empty();
437  if (queues_and_events_empty)
438  {
439  _cleanup_invalidated_thread_contexts();
440  _cleanup_invalidated_loggers();
441  _try_shrink_empty_transit_event_buffers();
442 
443  // poll hook
444  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_end)))
445  {
446  _invoke_poll_end_once(poll_end_called);
447  }
448 
449  // There is nothing left to do, and we can let this thread sleep for a while
450  // buffer events are 0 here and also all the producer queues are empty
451  if (_options.sleep_duration.count() != 0)
452  {
453  std::unique_lock<std::mutex> lock{_wake_up_mutex};
454 
455  // Wait for a timeout or a notification to wake up
456  _wake_up_cv.wait_for(lock, _options.sleep_duration, [this] { return _wake_up_flag; });
457 
458  // set the flag back to false since we woke up here
459  _wake_up_flag = false;
460 
461  // After waking up resync rdtsc clock again and resume
462  _resync_rdtsc_clock();
463  }
464  else if (_options.enable_yield_when_idle)
465  {
467  }
468  }
469  }
470 
471  // poll hook
472  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_end)))
473  {
474  _invoke_poll_end_once(poll_end_called);
475  }
476  }
477 
481  QUILL_ATTRIBUTE_COLD void _init(BackendOptions const& options)
482  {
483  _options = options;
484 
485  // ManualBackendWorker::init() calls _init() directly, so validate here as well.
486  (void)BackendMdcState{_options.mdc_format_pattern};
487 
488  (void)get_thread_name();
489 
490  // Double check or modify some backend options before we start
491  if (_options.transit_events_hard_limit == 0)
492  {
493  // transit_events_hard_limit of 0 makes no sense as we can't process anything
494  _options.transit_events_hard_limit = 1;
495  }
496 
497  if (_options.transit_events_soft_limit == 0)
498  {
499  _options.transit_events_soft_limit = 1;
500  }
501 
503  {
504  QUILL_THROW(QuillError{fmtquill::format(
505  "transit_events_soft_limit ({}) cannot be greater than transit_events_hard_limit "
506  "({}). Please ensure that the soft limit is less than or equal to the hard limit.",
508  }
509  else if (!is_power_of_two(_options.transit_events_hard_limit))
510  {
511  QUILL_THROW(QuillError{fmtquill::format(
512  "transit_events_hard_limit ({}) must be a power of two", _options.transit_events_hard_limit)});
513  }
514  else if (!is_power_of_two(_options.transit_events_soft_limit))
515  {
516  QUILL_THROW(QuillError{fmtquill::format(
517  "transit_events_soft_limit ({}) must be a power of two", _options.transit_events_soft_limit)});
518  }
519 
520  _last_output_timestamp = 0;
521 
522  // Backend::stop() releases the worker's cache, but thread-local contexts can
523  // outlive the backend thread and be reused after a later Backend::start().
524  // Refresh unconditionally so existing frontend queues are visible again.
525  _update_active_thread_contexts_cache(true);
526 
527  // Cache this thread's id only after initialization has succeeded. ManualBackendWorker::init()
528  // calls this function on the caller thread and can propagate validation errors.
529  uint32_t const worker_thread_id = get_thread_id();
530  _worker_thread_id.store(worker_thread_id);
531  LoggerBase::set_current_thread_is_backend_thread(true);
532  }
533 
534  /***/
535  void _clear_backend_thread_flag() noexcept
536  {
537  LoggerBase::set_current_thread_is_backend_thread(false);
538  }
539 
543  QUILL_ATTRIBUTE_COLD void _exit()
544  {
545  while (true)
546  {
547  bool const queues_and_events_empty = (!_options.wait_for_queues_to_empty_before_exit) ||
548  _check_frontend_queues_and_cached_transit_events_empty();
549 
550  if (queues_and_events_empty)
551  {
552  // we are done, all queues are now empty
553  _check_failure_counter(_options.error_notifier);
554  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
555  break;
556  }
557 
558  uint64_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
559  if (cached_transit_events_count > 0)
560  {
561  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
562  _process_lowest_timestamp_transit_event())
563  {
564  // We need to be cautious because there are log messages in the lock-free queues
565  // that have not yet been cached in the transit event buffer. Logging only the cached
566  // messages can result in out-of-order log entries, as messages with lower timestamps
567  // in the queue might be missed.
568  }
569  }
570  }
571 
572  _cleanup_invalidated_thread_contexts();
573  _cleanup_invalidated_loggers();
574 
575  // Tear down the RdtscClock so a subsequent Backend::start() can recalibrate
576  // using the current options (e.g. a different rdtsc_resync_interval). The
577  // documented contract of Backend::stop() is that no other Quill API may run
578  // concurrently with it, including BackendTscClock::now()/to_time_point on
579  // user threads, so this is safe.
580  RdtscClock const* rdtsc_clock = _rdtsc_clock.exchange(nullptr, std::memory_order_acq_rel);
581  delete rdtsc_clock;
582 
583  _clear_backend_thread_flag();
584  }
585 
589  QUILL_ATTRIBUTE_HOT size_t _populate_transit_events_from_frontend_queues()
590  {
591  uint64_t const ts_now = _options.log_timestamp_ordering_grace_period.count()
593  static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(_options.log_timestamp_ordering_grace_period)
594  .count()))
595  : std::numeric_limits<uint64_t>::max();
596 
597  size_t total_cached_transit_events_count{0};
598 
599  for (ThreadContext* thread_context : _active_thread_contexts_cache)
600  {
601  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
602  "Invalid queue type in BackendWorker::_read_and_decode_frontend_queues()");
603 
604  if (thread_context->has_unbounded_queue_type())
605  {
606  total_cached_transit_events_count += _read_and_decode_frontend_queue(
607  thread_context->get_spsc_queue_union().unbounded_spsc_queue, thread_context, ts_now);
608  }
609  else if (thread_context->has_bounded_queue_type())
610  {
611  total_cached_transit_events_count += _read_and_decode_frontend_queue(
612  thread_context->get_spsc_queue_union().bounded_spsc_queue, thread_context, ts_now);
613  }
614  }
615 
616  return total_cached_transit_events_count;
617  }
618 
626  template <typename TFrontendQueue>
627  QUILL_ATTRIBUTE_HOT size_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue,
628  ThreadContext* thread_context, uint64_t ts_now)
629  {
630  // Note: The producer commits only complete messages to the queue.
631  // Therefore, if even a single byte is present in the queue, it signifies a full message.
632  size_t const queue_capacity = frontend_queue.capacity();
633  size_t total_bytes_read{0};
634 
635  do
636  {
637  std::byte* read_pos;
638 
639  if constexpr (std::is_same_v<TFrontendQueue, UnboundedSPSCQueue>)
640  {
641  read_pos = _read_unbounded_frontend_queue(frontend_queue, thread_context);
642  }
643  else
644  {
645  read_pos = frontend_queue.prepare_read();
646  }
647 
648  if (!read_pos)
649  {
650  // Exit loop nothing to read
651  break;
652  }
653 
654  std::byte const* const read_begin = read_pos;
655 
656  if (!_populate_transit_event_from_frontend_queue(read_pos, thread_context, ts_now))
657  {
658  // If _get_transit_event_from_queue returns false, stop reading
659  break;
660  }
661 
662  // Finish reading
663  QUILL_ASSERT(
664  read_pos >= read_begin,
665  "read_pos must be >= read_begin in BackendWorker::_read_and_decode_frontend_queue()");
666 
667  auto const bytes_read = static_cast<size_t>(read_pos - read_begin);
668  frontend_queue.finish_read(bytes_read);
669  total_bytes_read += bytes_read;
670  // Reads a maximum of one full frontend queue or the transit events' hard limit to prevent
671  // getting stuck on the same producer.
672  } while ((total_bytes_read < queue_capacity) &&
673  (thread_context->_transit_event_buffer->size() < _options.transit_events_hard_limit));
674 
675  if (total_bytes_read != 0)
676  {
677  // If we read something from the queue, we commit all the reads together at the end.
678  // This strategy enhances cache coherence performance by updating the shared atomic flag
679  // only once.
680  frontend_queue.commit_read();
681  }
682 
683  return thread_context->_transit_event_buffer->size();
684  }
685 
686  /***/
687  QUILL_ATTRIBUTE_HOT bool _populate_transit_event_from_frontend_queue(std::byte*& read_pos,
688  ThreadContext* thread_context,
689  uint64_t ts_now)
690  {
691  QUILL_ASSERT(thread_context->_transit_event_buffer,
692  "transit_event_buffer is nullptr in "
693  "BackendWorker::_populate_transit_event_from_frontend_queue()");
694 
695  // Allocate a new TransitEvent or use an existing one to store the message from the queue
696  TransitEvent* transit_event{nullptr};
697 
698  QUILL_TRY { transit_event = thread_context->_transit_event_buffer->back(); }
699 #if !defined(QUILL_NO_EXCEPTIONS)
700  QUILL_CATCH_ALL()
701  {
702  // Growing the transit event buffer failed (allocation failure). Nothing has been consumed
703  // from the queue yet, so leave the message where it is and stop reading this queue. The
704  // poll loop keeps processing already-cached events, which frees buffer slots so a later
705  // attempt succeeds at the current capacity without allocating.
706  // Pass a literal here: formatting a message could allocate and throw again, and an
707  // exception escaping this handler would skip event processing and prevent recovery.
708  _notify_error(_options.error_notifier,
709  "Quill ERROR: Failed to expand the transit event buffer (allocation failure). "
710  "The log message remains in the thread queue and will be retried");
711  return false;
712  }
713 #endif
714 
715  QUILL_ASSERT(
716  transit_event,
717  "transit_event is nullptr in BackendWorker::_populate_transit_event_from_frontend_queue()");
718 
719  QUILL_ASSERT(
720  transit_event->formatted_msg,
721  "formatted_msg is nullptr in BackendWorker::_populate_transit_event_from_frontend_queue()");
722 
723  static_assert(sizeof(FormatArgsDecoder) == sizeof(uintptr_t),
724  "FormatArgsDecoder must fit in uintptr_t for packed header decoding");
725  static_assert(sizeof(uintptr_t) <= sizeof(uint64_t),
726  "Packed header decoding requires pointers to fit in 64 bits");
727 
728  uint64_t header_words[4];
729  std::memcpy(header_words, read_pos, sizeof(header_words));
730  read_pos += sizeof(header_words);
731 
732  transit_event->timestamp = header_words[0];
733  transit_event->macro_metadata =
734  reinterpret_cast<MacroMetadata const*>(static_cast<uintptr_t>(header_words[1]));
735  transit_event->logger_base = reinterpret_cast<LoggerBase*>(static_cast<uintptr_t>(header_words[2]));
736 
737  QUILL_ASSERT(transit_event->logger_base,
738  "transit_event->logger_base is nullptr after memcpy from queue");
739 
740  QUILL_ASSERT(transit_event->logger_base->_clock_source == ClockSourceType::Tsc ||
741  transit_event->logger_base->_clock_source == ClockSourceType::System ||
742  transit_event->logger_base->_clock_source == ClockSourceType::User,
743  "transit_event->logger_base->_clock_source has invalid enum value - possible "
744  "memory corruption");
745 
746  if (transit_event->logger_base->_clock_source == ClockSourceType::Tsc)
747  {
748  // If using the rdtsc clock, convert the value to nanoseconds since epoch.
749  // This conversion ensures that every transit inserted in the buffer below has a timestamp in
750  // nanoseconds since epoch, allowing compatibility with Logger objects using different clocks.
751  if (QUILL_UNLIKELY(!_rdtsc_clock.load(std::memory_order_relaxed)))
752  {
753  // Lazy initialization of rdtsc clock on the backend thread only if the user decides to use
754  // it. The clock requires a few seconds to init as it is taking samples first.
755  _rdtsc_clock.store(new RdtscClock{_options.rdtsc_resync_interval}, std::memory_order_release);
756  _last_rdtsc_resync_time = std::chrono::steady_clock::now();
757  }
758 
759  // Convert the rdtsc value to nanoseconds since epoch.
760  transit_event->timestamp =
761  _rdtsc_clock.load(std::memory_order_relaxed)->time_since_epoch(transit_event->timestamp);
762  }
763 
764  // Check if strict log timestamp order is enabled and the clock source is not User
765  if ((transit_event->logger_base->_clock_source != ClockSourceType::User) &&
766  (ts_now != std::numeric_limits<uint64_t>::max()))
767  {
768  // We only check against `ts_now` for real timestamps, not custom timestamps by the user, and
769  // when the grace period is enabled
770 
771 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG)
772  // Check the timestamps we are comparing have the same digits
773  auto count_digits = [](uint64_t number)
774  {
775  uint32_t digits = 0;
776  do
777  {
778  digits++;
779  number /= 10;
780  } while (number != 0);
781  return digits;
782  };
783 
784  QUILL_ASSERT(count_digits(transit_event->timestamp) == count_digits(ts_now),
785  "Timestamp digits mismatch in "
786  "BackendWorker::_populate_transit_event_from_frontend_queue(), log timestamp "
787  "has different magnitude than current time");
788 #endif
789 
790  // Ensure the message timestamp is not greater than ts_now.
791  if (QUILL_UNLIKELY(transit_event->timestamp > ts_now))
792  {
793  // If the message timestamp is ahead of the grace-period cutoff, temporarily halt
794  // processing this queue. This keeps newer events in the queue until other frontend
795  // threads have had the configured grace window to publish older timestamped events.
796  // We return at this point without adding the current event to the buffer.
797  return false;
798  }
799  }
800 
801  bool const is_mdc_event = (transit_event->macro_metadata->event() == MacroMetadata::Event::MdcSet) ||
802  (transit_event->macro_metadata->event() == MacroMetadata::Event::MdcErase) ||
803  (transit_event->macro_metadata->event() == MacroMetadata::Event::MdcClear);
804 
805  if (transit_event->macro_metadata->event() != MacroMetadata::Event::Metric)
806  {
807  uintptr_t const decoder_bits = static_cast<uintptr_t>(header_words[3]);
808  FormatArgsDecoder format_args_decoder;
809  std::memcpy(&format_args_decoder, &decoder_bits, sizeof(format_args_decoder));
810 
811  if (is_mdc_event)
812  {
813  _apply_mdc_event(*thread_context, transit_event->macro_metadata->event(), format_args_decoder, read_pos);
814  }
815  else
816  {
817  if ((transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy) ||
818  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy) ||
819  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy))
820  {
821  _apply_runtime_metadata(read_pos, transit_event);
822  }
823 
824  // we need to check and do not try to format the flush events as that wouldn't be valid
825  if ((transit_event->macro_metadata->event() != MacroMetadata::Event::Flush) &&
826  (transit_event->macro_metadata->event() != MacroMetadata::Event::LoggerRemovalRequest))
827  {
828  format_args_decoder(read_pos, _format_args_store);
829 
830  if (!transit_event->macro_metadata->has_named_args())
831  {
832  _populate_formatted_log_message(transit_event, transit_event->macro_metadata->message_format());
833  }
834  else
835  {
836  // using the message_format as key for lookups
837  _named_args_format_template.assign(transit_event->macro_metadata->message_format());
838 
839  if (auto const search = _named_args_templates.find(_named_args_format_template);
840  search != std::cend(_named_args_templates))
841  {
842  // process named args message when we already have parsed the format message once,
843  // and we have the names of each arg cached
844  auto const& [message_format, arg_names] = search->second;
845 
846  _populate_formatted_log_message(transit_event, message_format.data());
847  _populate_formatted_named_args(transit_event, arg_names);
848  }
849  else
850  {
851  // process named args log when the message format is processed for the first time
852  // parse name of each arg and stored them to our lookup map
853  auto const [res_it, inserted] = _named_args_templates.try_emplace(
854  _named_args_format_template,
855  _process_named_args_format_message(transit_event->macro_metadata->message_format()));
856 
857  auto const& [message_format, arg_names] = res_it->second;
858 
859  // suppress unused warnings
860  (void)inserted;
861 
862  _populate_formatted_log_message(transit_event, message_format.data());
863  _populate_formatted_named_args(transit_event, arg_names);
864  }
865  }
866 
867  _set_transit_event_mdc(*thread_context, transit_event);
868  }
869  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::Flush)
870  {
871  // if this is a flush event then we do not need to format anything for the
872  // transit_event, but we need to set the transit event's flush_flag pointer instead
873  uintptr_t flush_flag_tmp;
874  std::memcpy(&flush_flag_tmp, read_pos, sizeof(uintptr_t));
875  transit_event->set_flush_flag(reinterpret_cast<std::atomic<bool>*>(flush_flag_tmp));
876  read_pos += sizeof(uintptr_t);
877  }
878  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LoggerRemovalRequest)
879  {
880  // Store the logger name and the sync flag
881  uintptr_t logger_removal_flag_tmp;
882  std::memcpy(&logger_removal_flag_tmp, read_pos, sizeof(uintptr_t));
883  read_pos += sizeof(uintptr_t);
884  std::string_view const logger_name = Codec<std::string>::decode_arg(read_pos);
885 
886  _logger_removal_flags.emplace(
887  std::string{logger_name}, reinterpret_cast<std::atomic<bool>*>(logger_removal_flag_tmp));
888  }
889  else
890  {
891  QUILL_ASSERT(
892  false,
893  "Unhandled event type in BackendWorker::_populate_transit_event_from_frontend_queue()");
894  }
895  }
896  }
897  else
898  {
899  double metric_value;
900  std::memcpy(&metric_value, &header_words[3], sizeof(metric_value));
901  transit_event->set_metric_value(metric_value);
902  }
903 
904  if (!is_mdc_event)
905  {
906  // commit this transit event
907  thread_context->_transit_event_buffer->push_back();
908  }
909 
910  _format_args_store.clear();
911 
912  return true;
913  }
914 
919  QUILL_ATTRIBUTE_HOT bool has_pending_events_for_caching_when_transit_event_buffer_empty() noexcept
920  {
921  _update_active_thread_contexts_cache();
922 
923  for (ThreadContext* thread_context : _active_thread_contexts_cache)
924  {
925  QUILL_ASSERT(thread_context->_transit_event_buffer,
926  "transit_event_buffer is nullptr in "
927  "BackendWorker::has_pending_events_for_caching_when_transit_event_buffer_empty()"
928  ", should be valid in _active_thread_contexts_cache");
929 
930  if (thread_context->_transit_event_buffer->empty())
931  {
932  // if there is no _transit_event_buffer yet then check only the queue
933  if (thread_context->has_unbounded_queue_type() &&
934  !thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty())
935  {
936  return true;
937  }
938 
939  if (thread_context->has_bounded_queue_type() &&
940  !thread_context->get_spsc_queue_union().bounded_spsc_queue.empty())
941  {
942  return true;
943  }
944  }
945  }
946 
947  return false;
948  }
949 
953  QUILL_ATTRIBUTE_HOT bool _process_lowest_timestamp_transit_event()
954  {
955  // Get the lowest timestamp
956  uint64_t min_ts{std::numeric_limits<uint64_t>::max()};
957  ThreadContext* thread_context{nullptr};
958 
959  for (ThreadContext* tc : _active_thread_contexts_cache)
960  {
961  QUILL_ASSERT(tc->_transit_event_buffer,
962  "transit_event_buffer is nullptr in "
963  "BackendWorker::_process_lowest_timestamp_transit_event(), should be valid in "
964  "_active_thread_contexts_cache");
965 
966  TransitEvent const* te = tc->_transit_event_buffer->front();
967  if (te && (min_ts > te->timestamp))
968  {
969  min_ts = te->timestamp;
970  thread_context = tc;
971  }
972  }
973 
974  if (!thread_context)
975  {
976  // all transit event buffers are empty
977  return false;
978  }
979 
980  TransitEvent* transit_event = thread_context->_transit_event_buffer->front();
981  QUILL_ASSERT(
982  transit_event,
983  "transit_event is nullptr in BackendWorker::_process_lowest_timestamp_transit_event() when "
984  "transit_buffer is set");
985 
986  std::atomic<bool>* flush_flag{nullptr};
987 
988  QUILL_TRY { _process_transit_event(*thread_context, *transit_event, flush_flag); }
989 #if !defined(QUILL_NO_EXCEPTIONS)
990  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
991  QUILL_CATCH_ALL()
992  {
993  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
994  }
995 #endif
996 
997  // Finally, clean up any remaining fields in the transit event
998  if (transit_event->extra_data)
999  {
1000  transit_event->extra_data->named_args.clear();
1001  transit_event->extra_data->mdc.clear();
1002  transit_event->extra_data->runtime_metadata.has_runtime_metadata = false;
1003  }
1004 
1005  // Note: event_payload is reset only in the Flush and Metric branches of _process_transit_event
1006  // where it is actually populated, rather than unconditionally here. This keeps the common
1007  // Log path free of an extra variant assignment.
1008 
1009  thread_context->_transit_event_buffer->pop_front();
1010 
1011  if (flush_flag)
1012  {
1013  // Process the second part of the flush event after it's been removed from the buffer,
1014  // ensuring that we are no longer interacting with the thread_context or transit_event.
1015 
1016  // This is particularly important for handling cases when Quill is used as a DLL on Windows.
1017  // If `FreeLibrary` is called, the backend thread may attempt to access an invalidated
1018  // `ThreadContext`, which can lead to a crash due to invalid memory access.
1019  //
1020  // To prevent this, whenever we receive a Flush event, we clean up any invalidated thread contexts
1021  // before notifying the caller. This ensures that when `flush_log()` is invoked in `DllMain`
1022  // during `DLL_PROCESS_DETACH`, the `ThreadContext` is properly cleaned up before the DLL exits.
1023  _cleanup_invalidated_thread_contexts();
1024 
1025  // Now it’s safe to notify the caller to continue execution.
1026  flush_flag->store(true);
1027  }
1028 
1029  return true;
1030  }
1031 
1035  QUILL_ATTRIBUTE_HOT void _process_transit_event(ThreadContext const& thread_context,
1036  TransitEvent& transit_event, std::atomic<bool>*& flush_flag)
1037  {
1038  // If backend_process(...) throws we want to skip this event and move to the next, so we catch
1039  // the error here instead of catching it in the parent try/catch block of main_loop
1040  if (transit_event.macro_metadata->event() == MacroMetadata::Event::Log)
1041  {
1042  if (transit_event.log_level() != LogLevel::Backtrace)
1043  {
1044  _ensure_monotonic_output_timestamp(transit_event);
1045  _dispatch_transit_event_to_sinks(transit_event, thread_context.thread_id(),
1046  thread_context.thread_name());
1047 
1048  // We also need to check the severity of the log message here against the backtrace
1049  // Check if we should also flush the backtrace messages:
1050  // After we forwarded the message we will check the severity of this message for this logger
1051  // If the severity of the message is higher than the backtrace flush severity we will also
1052  // flush the backtrace of the logger
1053  if (QUILL_UNLIKELY(transit_event.log_level() >=
1054  transit_event.logger_base->_backtrace_flush_level.load(std::memory_order_relaxed)))
1055  {
1056  if (transit_event.logger_base->_backtrace_storage)
1057  {
1058  transit_event.logger_base->_backtrace_storage->process(
1059  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
1060  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
1061  }
1062  }
1063  }
1064  else
1065  {
1066  if (transit_event.logger_base->_backtrace_storage)
1067  {
1068  // this is a backtrace log and we will store the transit event
1069  // we need to pass a copy of transit_event here and not move the existing
1070  // the transit events are reused
1071  TransitEvent transit_event_copy;
1072  transit_event.copy_to(transit_event_copy);
1073 
1074  transit_event.logger_base->_backtrace_storage->store(
1075  std::move(transit_event_copy), thread_context.thread_id(), thread_context.thread_name());
1076  }
1077  else
1078  {
1079  QUILL_THROW(
1080  QuillError{"logger->init_backtrace(...) needs to be called first before using "
1081  "LOG_BACKTRACE(...)."});
1082  }
1083  }
1084  }
1085  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::InitBacktrace)
1086  {
1087  // we can just convert the capacity back to int here and use it
1088  if (!transit_event.logger_base->_backtrace_storage)
1089  {
1090  // Lazy BacktraceStorage creation
1091  transit_event.logger_base->_backtrace_storage = std::make_shared<BacktraceStorage>();
1092  }
1093 
1094  transit_event.logger_base->_backtrace_storage->set_capacity(static_cast<uint32_t>(std::stoul(
1095  std::string{transit_event.formatted_msg->begin(), transit_event.formatted_msg->end()})));
1096  }
1097  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::FlushBacktrace)
1098  {
1099  if (transit_event.logger_base->_backtrace_storage)
1100  {
1101  // process all records in backtrace for this logger and log them
1102  transit_event.logger_base->_backtrace_storage->process(
1103  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
1104  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
1105  }
1106  }
1107  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::Flush)
1108  {
1109  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
1110 
1111  // This is a flush event, so we capture the flush flag to notify the caller after processing.
1112  flush_flag = transit_event.flush_flag();
1113 
1114  // Reset the flush flag as TransitEvents are re-used, preventing incorrect flag reuse.
1115  transit_event.reset_payload();
1116 
1117  // We defer notifying the caller until after this function completes.
1118  }
1119  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::Metric)
1120  {
1121  _ensure_monotonic_output_timestamp(transit_event);
1122  _write_metric_sample(transit_event, thread_context.thread_id(), thread_context.thread_name());
1123 
1124  // Reset the payload as TransitEvents are re-used, so a later reuse of this slot starts
1125  // from a clean variant state instead of still carrying the metric value.
1126  transit_event.reset_payload();
1127  }
1128  else
1129  {
1130  QUILL_ASSERT(transit_event.macro_metadata->event() == MacroMetadata::Event::LoggerRemovalRequest,
1131  "Unexpected transit event type in BackendWorker::_process_transit_event()");
1132  // LoggerRemovalRequest is handled when the removal flag map is populated. It has no sink
1133  // dispatch work of its own.
1134  }
1135  }
1136 
1142  QUILL_ATTRIBUTE_HOT void _ensure_monotonic_output_timestamp(TransitEvent& transit_event) noexcept
1143  {
1144  if (!_options.ensure_monotonic_output_timestamps)
1145  {
1146  return;
1147  }
1148 
1149  if (QUILL_UNLIKELY(transit_event.timestamp < _last_output_timestamp))
1150  {
1151  transit_event.timestamp = (_last_output_timestamp == std::numeric_limits<uint64_t>::max())
1152  ? _last_output_timestamp
1153  : _last_output_timestamp + 1;
1154  }
1155 
1156  _last_output_timestamp = transit_event.timestamp;
1157  }
1158 
1162  QUILL_ATTRIBUTE_HOT void _dispatch_transit_event_to_sinks(TransitEvent const& transit_event,
1163  std::string_view thread_id, std::string_view thread_name)
1164  {
1165  // First check to see if we should init the pattern formatter on a new Logger
1166  // Look up to see if we have the formatter and if not create it
1167  if (QUILL_UNLIKELY(!transit_event.logger_base->_pattern_formatter))
1168  {
1169  // Search for an existing pattern_formatter in each logger
1170  _logger_manager.for_each_logger(
1171  [&transit_event](LoggerBase* logger)
1172  {
1173  if (logger->_pattern_formatter &&
1174  (logger->_pattern_formatter->get_options() == transit_event.logger_base->_pattern_formatter_options))
1175  {
1176  // hold a copy of the shared_ptr of the same formatter
1177  transit_event.logger_base->_pattern_formatter = logger->_pattern_formatter;
1178  return true;
1179  }
1180 
1181  return false;
1182  });
1183 
1184  if (!transit_event.logger_base->_pattern_formatter)
1185  {
1186  // Didn't find an existing formatter need to create a new pattern formatter
1187  transit_event.logger_base->_pattern_formatter =
1188  std::make_shared<PatternFormatter>(transit_event.logger_base->_pattern_formatter_options);
1189  }
1190  }
1191 
1192  QUILL_ASSERT(transit_event.logger_base->_pattern_formatter,
1193  "pattern_formatter is nullptr in BackendWorker::_process_transit_event(), should "
1194  "have been created earlier");
1195 
1196  // proceed after ensuring a pattern formatter exists
1197  std::string_view const log_level_description =
1198  log_level_to_string(transit_event.log_level(), _options.log_level_descriptions.data(),
1199  _options.log_level_descriptions.size());
1200 
1201  std::string_view const log_level_short_code =
1202  log_level_to_string(transit_event.log_level(), _options.log_level_short_codes.data(),
1203  _options.log_level_short_codes.size());
1204 
1205  _write_log_statement(
1206  transit_event, thread_id, thread_name, log_level_description, log_level_short_code,
1207  std::string_view{transit_event.formatted_msg->data(), transit_event.formatted_msg->size()});
1208  }
1209 
1213  QUILL_ATTRIBUTE_HOT void _write_metric_sample(TransitEvent const& transit_event, std::string_view thread_id,
1214  std::string_view thread_name) const
1215  {
1216  QUILL_ASSERT(
1217  transit_event.macro_metadata,
1218  "transit_event.macro_metadata is nullptr in BackendWorker::_write_metric_sample()");
1219  QUILL_ASSERT(transit_event.macro_metadata->event() == MacroMetadata::Event::Metric,
1220  "Unexpected transit event type in BackendWorker::_write_metric_sample()");
1221 
1222  // MacroMetadata is the first non-virtual base of MetricMetadata so static_cast is well-defined
1223  // here. The Event::Metric check above guarantees the dynamic type.
1224  auto const* metric_metadata = static_cast<MetricMetadata const*>(transit_event.macro_metadata);
1225  double const metric_value = transit_event.metric_value();
1226 
1227  for (auto& sink : transit_event.logger_base->_sinks)
1228  {
1229  sink->write_metric(metric_metadata, transit_event.timestamp, thread_id, thread_name,
1230  _process_id, transit_event.logger_base->_logger_name, metric_value);
1231  }
1232  }
1233 
1237  QUILL_ATTRIBUTE_HOT void _write_log_statement(TransitEvent const& transit_event,
1238  std::string_view const& thread_id,
1239  std::string_view const& thread_name,
1240  std::string_view const& log_level_description,
1241  std::string_view const& log_level_short_code,
1242  std::string_view const& log_message) const
1243  {
1244  std::string_view default_log_statement;
1245 
1246  // Process each sink with the appropriate formatting and filtering
1247  for (auto& sink : transit_event.logger_base->_sinks)
1248  {
1249  std::string_view log_to_write;
1250 
1251  // Determine which formatted log to use
1252  if (!sink->_override_pattern_formatter_options)
1253  {
1254  if (default_log_statement.empty())
1255  {
1256  // Use the default formatted log statement, here by checking empty() we try to format once
1257  // even for multiple sinks
1258  default_log_statement = transit_event.logger_base->_pattern_formatter->format(
1259  transit_event.timestamp, thread_id, thread_name, _process_id, transit_event.logger_base->_logger_name,
1260  log_level_description, log_level_short_code, *transit_event.macro_metadata,
1261  transit_event.get_named_args(), log_message, transit_event.mdc());
1262  }
1263 
1264  log_to_write = default_log_statement;
1265  }
1266  else
1267  {
1268  // Sink has override_pattern_formatter_options, we do not include PatternFormatter
1269  // in the frontend fo this reason we init PatternFormatter here
1270  if (!sink->_override_pattern_formatter)
1271  {
1272  // Initialize override formatter if needed
1273  sink->_override_pattern_formatter =
1274  std::make_shared<PatternFormatter>(*sink->_override_pattern_formatter_options);
1275  }
1276 
1277  // Use the sink's override formatter
1278  log_to_write = sink->_override_pattern_formatter->format(
1279  transit_event.timestamp, thread_id, thread_name, _process_id, transit_event.logger_base->_logger_name,
1280  log_level_description, log_level_short_code, *transit_event.macro_metadata,
1281  transit_event.get_named_args(), log_message, transit_event.mdc());
1282  }
1283 
1284  // Apply filters now that we have the formatted log
1285  if (sink->apply_all_filters(transit_event.macro_metadata, transit_event.timestamp, thread_id,
1286  thread_name, transit_event.logger_base->_logger_name,
1287  transit_event.log_level(), log_message, log_to_write))
1288  {
1289  // Forward the message using the computed log statement that passed the filter
1290  sink->write_log(transit_event.macro_metadata, transit_event.timestamp, thread_id,
1291  thread_name, _process_id, transit_event.logger_base->_logger_name,
1292  transit_event.log_level(), log_level_description, log_level_short_code,
1293  transit_event.get_named_args(), log_message, log_to_write);
1294  }
1295  }
1296  }
1297 
1302  QUILL_ATTRIBUTE_HOT void _check_failure_counter(std::function<void(std::string const&)> const& error_notifier)
1303  {
1304  if (!error_notifier)
1305  {
1306  return;
1307  }
1308 
1309  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1310  {
1311  size_t const failed_events_cnt = thread_context->get_and_reset_failure_counter();
1312 
1313  if (QUILL_UNLIKELY(failed_events_cnt > 0))
1314  {
1315  char timestamp[24];
1316  time_t now = time(nullptr);
1317  tm local_time;
1318  localtime_rs(&now, &local_time);
1319  strftime(timestamp, sizeof(timestamp), "%X", &local_time);
1320 
1321  if (thread_context->has_dropping_queue())
1322  {
1323  if (thread_context->has_unbounded_queue_type())
1324  {
1325  _notify_error(error_notifier,
1326  fmtquill::format("{} Quill INFO: Reached the maximum configured "
1327  "unbounded queue capacity and dropped "
1328  "{} events from thread {}",
1329  timestamp, failed_events_cnt, thread_context->thread_id()));
1330  }
1331  else
1332  {
1333  _notify_error(error_notifier,
1334  fmtquill::format("{} Quill INFO: Dropped {} events from thread {}",
1335  timestamp, failed_events_cnt, thread_context->thread_id()));
1336  }
1337  }
1338  else if (thread_context->has_blocking_queue())
1339  {
1340  if (thread_context->has_unbounded_queue_type())
1341  {
1342  _notify_error(
1343  error_notifier,
1344  fmtquill::format(
1345  "{} Quill INFO: Reached the maximum configured unbounded queue capacity and "
1346  "experienced {} blocking occurrences on thread {}",
1347  timestamp, failed_events_cnt, thread_context->thread_id()));
1348  }
1349  else
1350  {
1351  _notify_error(
1352  error_notifier,
1353  fmtquill::format("{} Quill INFO: Experienced {} blocking occurrences on thread {}",
1354  timestamp, failed_events_cnt, thread_context->thread_id()));
1355  }
1356  }
1357  }
1358  }
1359  }
1360 
1366  QUILL_ATTRIBUTE_HOT static std::pair<std::string, std::vector<std::pair<std::string, std::string>>> _process_named_args_format_message(
1367  std::string_view fmt_template)
1368  {
1369  // It would be nice to do this at compile time and store it in macro metadata, but without
1370  // constexpr vector and string in c++17 it is not possible
1371  std::string fmt_str;
1372  std::vector<std::pair<std::string, std::string>> keys;
1373  fmt_str.reserve(fmt_template.size());
1374 
1375  size_t pos = 0;
1376  while (pos < fmt_template.size())
1377  {
1378  if (fmt_template[pos] != '{')
1379  {
1380  fmt_str += fmt_template[pos];
1381  ++pos;
1382  continue;
1383  }
1384 
1385  if ((pos + 1 < fmt_template.size()) && (fmt_template[pos + 1] == '{'))
1386  {
1387  fmt_str += "{{";
1388  pos += 2;
1389  continue;
1390  }
1391 
1392  size_t const placeholder_start = pos;
1393  ++pos; // consume '{'
1394 
1395  size_t const close_bracket_pos = _find_placeholder_closing_brace(fmt_template, pos);
1396  if (close_bracket_pos == std::string_view::npos)
1397  {
1398  fmt_str += std::string{fmt_template.substr(placeholder_start)};
1399  break;
1400  }
1401 
1402  std::string_view const text_inside_placeholders = fmt_template.substr(pos, close_bracket_pos - pos);
1403  std::string_view arg_syntax;
1404  std::string_view arg_name;
1405 
1406  if (size_t const syntax_separator = text_inside_placeholders.find(':');
1407  syntax_separator != std::string_view::npos)
1408  {
1409  arg_syntax = text_inside_placeholders.substr(
1410  syntax_separator, text_inside_placeholders.size() - syntax_separator);
1411  arg_name = text_inside_placeholders.substr(0, syntax_separator);
1412  }
1413  else
1414  {
1415  arg_name = text_inside_placeholders;
1416  }
1417 
1418  std::string const normalized_arg_syntax = _normalize_named_arg_syntax(arg_syntax);
1419 
1420  fmt_str += "{";
1421  fmt_str += normalized_arg_syntax;
1422  fmt_str += "}";
1423 
1424  keys.emplace_back(_make_unique_named_arg_key(keys, arg_name), normalized_arg_syntax);
1425  pos = close_bracket_pos + 1;
1426  }
1427 
1428  return std::make_pair(fmt_str, keys);
1429  }
1430 
1431  QUILL_NODISCARD static std::string _make_unique_named_arg_key(
1432  std::vector<std::pair<std::string, std::string>> const& existing_keys, std::string_view arg_name)
1433  {
1434  std::string key{arg_name};
1435 
1436  for (size_t suffix = 1;; ++suffix)
1437  {
1438  bool key_already_exists = false;
1439  for (auto const& existing_key : existing_keys)
1440  {
1441  if (existing_key.first == key)
1442  {
1443  key_already_exists = true;
1444  break;
1445  }
1446  }
1447 
1448  if (!key_already_exists)
1449  {
1450  return key;
1451  }
1452 
1453  key = std::string{arg_name};
1454  key += "_";
1455  key += std::to_string(suffix);
1456  }
1457  }
1458 
1459  QUILL_NODISCARD static size_t _find_placeholder_closing_brace(std::string_view text, size_t pos) noexcept
1460  {
1461  size_t nested_replacement_fields = 0;
1462 
1463  while (pos < text.size())
1464  {
1465  if (text[pos] == '{')
1466  {
1467  if ((pos + 1 < text.size()) && (text[pos + 1] == '{'))
1468  {
1469  pos += 2;
1470  continue;
1471  }
1472 
1473  ++nested_replacement_fields;
1474  ++pos;
1475  continue;
1476  }
1477 
1478  if (text[pos] == '}')
1479  {
1480  if (nested_replacement_fields == 0)
1481  {
1482  return pos;
1483  }
1484 
1485  --nested_replacement_fields;
1486  }
1487 
1488  ++pos;
1489  }
1490 
1491  return std::string_view::npos;
1492  }
1493 
1494  QUILL_NODISCARD static std::string _normalize_named_arg_syntax(std::string_view arg_syntax)
1495  {
1496  std::string normalized;
1497  normalized.reserve(arg_syntax.size());
1498 
1499  size_t pos = 0;
1500  while (pos < arg_syntax.size())
1501  {
1502  if (arg_syntax[pos] != '{')
1503  {
1504  normalized += arg_syntax[pos];
1505  ++pos;
1506  continue;
1507  }
1508 
1509  if ((pos + 1 < arg_syntax.size()) && (arg_syntax[pos + 1] == '{'))
1510  {
1511  normalized += "{{";
1512  pos += 2;
1513  continue;
1514  }
1515 
1516  size_t const close_bracket_pos = _find_placeholder_closing_brace(arg_syntax, pos + 1);
1517  if (close_bracket_pos == std::string_view::npos)
1518  {
1519  normalized += std::string{arg_syntax.substr(pos)};
1520  break;
1521  }
1522 
1523  normalized += "{}";
1524  pos = close_bracket_pos + 1;
1525  }
1526 
1527  return normalized;
1528  }
1529 
1536  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _read_unbounded_frontend_queue(UnboundedSPSCQueue& frontend_queue,
1537  ThreadContext* thread_context) const
1538  {
1539  auto const read_result = frontend_queue.prepare_read();
1540 
1541  if (read_result.allocation)
1542  {
1543  if ((read_result.new_capacity < read_result.previous_capacity) && thread_context->_transit_event_buffer)
1544  {
1545  // The user explicitly requested to shrink the queue, indicating a preference for low memory
1546  // usage. To align with this intent, we also request shrinking the backend buffer.
1547  thread_context->_transit_event_buffer->request_shrink();
1548  }
1549 
1550  // When allocation_info has a value it means that the queue has re-allocated
1551  if (_options.error_notifier)
1552  {
1553  char ts[24];
1554  time_t t = time(nullptr);
1555  tm p;
1556  localtime_rs(std::addressof(t), std::addressof(p));
1557  strftime(ts, 24, "%X", std::addressof(p));
1558 
1559  // we switched to a new here, and we also notify the user of the allocation via the
1560  // error_notifier
1561  _notify_error(
1562  _options.error_notifier,
1563  fmtquill::format("{} Quill INFO: Allocated a new SPSC queue with a capacity of {} KiB "
1564  "(previously {} KiB) from thread {}",
1565  ts, (read_result.new_capacity / 1024),
1566  (read_result.previous_capacity / 1024), thread_context->thread_id()));
1567  }
1568  }
1569 
1570  return read_result.read_pos;
1571  }
1572 
1573  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _check_frontend_queues_and_cached_transit_events_empty()
1574  {
1575  _update_active_thread_contexts_cache();
1576 
1577  bool all_empty{true};
1578 
1579  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1580  {
1581  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
1582  "Invalid queue type in "
1583  "BackendWorker::_check_frontend_queues_and_cached_transit_events_empty()");
1584 
1585  if (thread_context->has_unbounded_queue_type())
1586  {
1587  all_empty &= thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty();
1588  }
1589  else if (thread_context->has_bounded_queue_type())
1590  {
1591  all_empty &= thread_context->get_spsc_queue_union().bounded_spsc_queue.empty();
1592  }
1593 
1594  QUILL_ASSERT(thread_context->_transit_event_buffer,
1595  "transit_event_buffer is nullptr in "
1596  "BackendWorker::_check_frontend_queues_and_cached_transit_events_empty(), "
1597  "should be valid in _active_thread_contexts_cache");
1598 
1599  all_empty &= thread_context->_transit_event_buffer->empty();
1600  }
1601 
1602  return all_empty;
1603  }
1604 
1608  QUILL_ATTRIBUTE_HOT void _resync_rdtsc_clock()
1609  {
1610  if (_rdtsc_clock.load(std::memory_order_relaxed))
1611  {
1612  // resync in rdtsc if we are not logging so that time_since_epoch() still works
1613  if (auto const now = std::chrono::steady_clock::now();
1614  (now - _last_rdtsc_resync_time) > _options.rdtsc_resync_interval)
1615  {
1616  if (_rdtsc_clock.load(std::memory_order_relaxed)->resync(2500))
1617  {
1618  _last_rdtsc_resync_time = now;
1619  }
1620  }
1621  }
1622  }
1623 
1624  /***/
1625  QUILL_ATTRIBUTE_HOT void _flush_and_run_active_sinks(bool run_periodic_tasks, std::chrono::milliseconds sink_min_flush_interval)
1626  {
1627  // Populate the active sinks cache with unique sinks, consider only the valid loggers
1628  _logger_manager.for_each_logger(
1629  [this](LoggerBase* logger)
1630  {
1631  if (logger->is_valid_logger())
1632  {
1633  for (std::shared_ptr<Sink> const& sink : logger->_sinks)
1634  {
1635  Sink* logger_sink_ptr = sink.get();
1636  auto search_it = std::find_if(_active_sinks_cache.begin(), _active_sinks_cache.end(),
1637  [logger_sink_ptr](Sink* elem)
1638  {
1639  // no one else can remove the shared pointer as this is
1640  // only running on backend thread
1641  return elem == logger_sink_ptr;
1642  });
1643 
1644  if (search_it == std::end(_active_sinks_cache))
1645  {
1646  _active_sinks_cache.push_back(logger_sink_ptr);
1647  }
1648  }
1649  }
1650 
1651  // return false to never end the loop early
1652  return false;
1653  });
1654 
1655  bool should_flush_sinks{false};
1656  std::chrono::steady_clock::time_point now;
1657 
1658  if (sink_min_flush_interval.count())
1659  {
1660  // conditional flush sinks
1661  now = std::chrono::steady_clock::now();
1662  if ((now - _last_sink_flush_time) > sink_min_flush_interval)
1663  {
1664  should_flush_sinks = true;
1665  }
1666  }
1667  else
1668  {
1669  // sink_min_flush_interval == 0 - always flush sinks
1670  should_flush_sinks = true;
1671  }
1672 
1673  for (auto const& sink : _active_sinks_cache)
1674  {
1675  QUILL_TRY
1676  {
1677  if (should_flush_sinks)
1678  {
1679  // If an exception is thrown, catch it here to prevent it from propagating
1680  // to the outer function. This prevents potential infinite loops caused by failing
1681  // flush operations.
1682  sink->flush_sink();
1683  }
1684  }
1685 #if !defined(QUILL_NO_EXCEPTIONS)
1686  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
1687  QUILL_CATCH_ALL()
1688  {
1689  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1690  }
1691 #endif
1692 
1693  if (run_periodic_tasks)
1694  {
1695  QUILL_TRY { sink->run_periodic_tasks(); }
1696 #if !defined(QUILL_NO_EXCEPTIONS)
1697  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
1698  QUILL_CATCH_ALL()
1699  {
1700  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1701  }
1702 #endif
1703  }
1704  }
1705 
1706  // Only update the timestamp after we actually attempted to flush
1707  if (should_flush_sinks && sink_min_flush_interval.count() && !_active_sinks_cache.empty())
1708  {
1709  _last_sink_flush_time = now;
1710  }
1711 
1712  _active_sinks_cache.clear();
1713  }
1714 
1718  QUILL_ATTRIBUTE_HOT void _update_active_thread_contexts_cache(bool force = false)
1719  {
1720  // Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger
1721  if (QUILL_UNLIKELY(force || _thread_context_manager.new_thread_context_flag()))
1722  {
1723  _active_thread_contexts_cache.clear();
1724  _thread_context_manager.for_each_thread_context(
1725  [this](ThreadContext* thread_context)
1726  {
1727  if (!thread_context->_transit_event_buffer)
1728  {
1729  // Lazy initialise the _transit_event_buffer for this thread_context
1730  thread_context->_transit_event_buffer =
1731  std::make_shared<TransitEventBuffer>(_options.transit_event_buffer_initial_capacity);
1732  }
1733 
1734  // We do not skip invalidated && empty queue thread contexts as this is very rare,
1735  // so instead we just add them and expect them to be cleaned in the next iteration
1736  _active_thread_contexts_cache.push_back(thread_context);
1737  });
1738  }
1739  }
1740 
1747  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_thread_contexts()
1748  {
1749  if (!_thread_context_manager.has_invalid_thread_context())
1750  {
1751  return;
1752  }
1753 
1754  auto find_invalid_and_empty_thread_context_callback = [](ThreadContext* thread_context)
1755  {
1756  // If the thread context is invalid it means the thread that created it has now died.
1757  // We also want to empty the queue from all LogRecords before removing the thread context
1758  if (!thread_context->is_valid())
1759  {
1760  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
1761  "Invalid queue type in BackendWorker::_update_active_thread_contexts_cache()");
1762 
1763  QUILL_ASSERT(thread_context->_transit_event_buffer,
1764  "transit_event_buffer is nullptr in "
1765  "BackendWorker::_update_active_thread_contexts_cache(), should be valid in "
1766  "_active_thread_contexts_cache");
1767 
1768  // detect empty queue
1769  if (thread_context->has_unbounded_queue_type())
1770  {
1771  return thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty() &&
1772  thread_context->_transit_event_buffer->empty();
1773  }
1774 
1775  if (thread_context->has_bounded_queue_type())
1776  {
1777  return thread_context->get_spsc_queue_union().bounded_spsc_queue.empty() &&
1778  thread_context->_transit_event_buffer->empty();
1779  }
1780  }
1781 
1782  return false;
1783  };
1784 
1785  // First we iterate our existing cache and we look for any invalidated contexts
1786  auto found_invalid_and_empty_thread_context =
1787  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1788  find_invalid_and_empty_thread_context_callback);
1789 
1790  while (QUILL_UNLIKELY(found_invalid_and_empty_thread_context != std::end(_active_thread_contexts_cache)))
1791  {
1792  // if we found anything then remove it - Here if we have more than one to remove we will
1793  // try to acquire the lock multiple times, but it should be fine as it is unlikely to have
1794  // that many to remove
1795  _thread_context_manager.remove_shared_invalidated_thread_context(*found_invalid_and_empty_thread_context);
1796 
1797  // We also need to remove it from _thread_context_cache, that is used only by the backend
1798  _active_thread_contexts_cache.erase(found_invalid_and_empty_thread_context);
1799 
1800  // And then look again
1801  found_invalid_and_empty_thread_context =
1802  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1803  find_invalid_and_empty_thread_context_callback);
1804  }
1805  }
1806 
1810  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_loggers()
1811  {
1812  // since there are no messages we can check for invalidated loggers and clean them up
1813  _logger_manager.cleanup_invalidated_loggers(
1814  [this]()
1815  {
1816  // check the queues are empty each time before removing a logger to avoid
1817  // potential race condition of the logger* still being in the queue
1818  return _check_frontend_queues_and_cached_transit_events_empty();
1819  },
1820  _removed_loggers);
1821 
1822  if (!_removed_loggers.empty())
1823  {
1824  // if loggers were removed also check for sinks to remove
1825  // cleanup_unused_sinks is expensive and should be only called when it is needed
1826  _sink_manager.cleanup_unused_sinks();
1827 
1828  for (auto const& removed_logger_name : _removed_loggers)
1829  {
1830  // Notify the user if the blocking call was used
1831  auto search_it = _logger_removal_flags.find(removed_logger_name);
1832  if (search_it != _logger_removal_flags.end())
1833  {
1834  search_it->second->store(true);
1835  _logger_removal_flags.erase(search_it);
1836  }
1837  }
1838 
1839  _removed_loggers.clear();
1840  }
1841  }
1842 
1847  QUILL_ATTRIBUTE_HOT void _try_shrink_empty_transit_event_buffers()
1848  {
1849  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1850  {
1851  if (thread_context->_transit_event_buffer)
1852  {
1853  thread_context->_transit_event_buffer->try_shrink();
1854  }
1855  }
1856  }
1857 
1858  void _apply_mdc_event(ThreadContext& thread_context, MacroMetadata::Event event,
1859  FormatArgsDecoder format_args_decoder, std::byte*& read_pos)
1860  {
1861  if (event == MacroMetadata::Event::MdcSet)
1862  {
1863  format_args_decoder(read_pos, _format_args_store);
1864 
1865  QUILL_ASSERT((_format_args_store.size() % 2) == 0,
1866  "MdcSet decoded argument count mismatch in BackendWorker::_apply_mdc_event()");
1867 
1868  if (!thread_context._backend_mdc_state)
1869  {
1870  thread_context._backend_mdc_state = std::make_shared<BackendMdcState>(_options.mdc_format_pattern);
1871  }
1872 
1873  BackendMdcState& mdc_state = *thread_context._backend_mdc_state;
1874  auto const field_count = static_cast<uint32_t>(_format_args_store.size() / 2);
1875 
1876  _mdc_fields.clear();
1877  _mdc_fields.reserve(field_count);
1878 
1879  QUILL_TRY
1880  {
1881  for (uint32_t i = 0; i < field_count; ++i)
1882  {
1883  int const key_index = static_cast<int>(i * 2u);
1884  int const value_index = key_index + 1;
1885 
1886  std::string const key = fmtquill::vformat(
1887  "{}", fmtquill::basic_format_args<fmtquill::format_context>{_format_args_store.data() + key_index, 1});
1888  std::string const value = fmtquill::vformat(
1889  "{}", fmtquill::basic_format_args<fmtquill::format_context>{_format_args_store.data() + value_index, 1});
1890 
1891  _mdc_fields.emplace_back(key, value);
1892  }
1893  }
1894 #if !defined(QUILL_NO_EXCEPTIONS)
1895  QUILL_CATCH(std::exception const& e)
1896  {
1897  _notify_error(_options.error_notifier, e.what());
1898  return;
1899  }
1900  QUILL_CATCH_ALL()
1901  {
1902  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1903  return;
1904  }
1905 #endif
1906 
1907  for (auto const& [key, value] : _mdc_fields)
1908  {
1909  mdc_state.set(key, value);
1910  }
1911 
1912  QUILL_TRY { mdc_state.rebuild_formatted_mdc(); }
1913 #if !defined(QUILL_NO_EXCEPTIONS)
1914  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
1915  QUILL_CATCH_ALL()
1916  {
1917  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1918  }
1919 #endif
1920  }
1921  else if (event == MacroMetadata::Event::MdcErase)
1922  {
1923  format_args_decoder(read_pos, _format_args_store);
1924 
1925  QUILL_ASSERT(_format_args_store.size() >= 1,
1926  "MdcErase decoded argument count mismatch in BackendWorker::_apply_mdc_event()");
1927 
1928  if (thread_context._backend_mdc_state)
1929  {
1930  _mdc_keys.clear();
1931  _mdc_keys.reserve(static_cast<size_t>(_format_args_store.size()));
1932 
1933  QUILL_TRY
1934  {
1935  for (int i = 0; i < _format_args_store.size(); ++i)
1936  {
1937  _mdc_keys.emplace_back(fmtquill::vformat(
1938  "{}", fmtquill::basic_format_args<fmtquill::format_context>{_format_args_store.data() + i, 1}));
1939  }
1940  }
1941 #if !defined(QUILL_NO_EXCEPTIONS)
1942  QUILL_CATCH(std::exception const& e)
1943  {
1944  _notify_error(_options.error_notifier, e.what());
1945  return;
1946  }
1947  QUILL_CATCH_ALL()
1948  {
1949  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1950  return;
1951  }
1952 #endif
1953 
1954  for (std::string const& key : _mdc_keys)
1955  {
1956  thread_context._backend_mdc_state->erase(key);
1957  }
1958 
1959  if (thread_context._backend_mdc_state->empty())
1960  {
1961  thread_context._backend_mdc_state.reset();
1962  }
1963  else
1964  {
1965  QUILL_TRY { thread_context._backend_mdc_state->rebuild_formatted_mdc(); }
1966 #if !defined(QUILL_NO_EXCEPTIONS)
1967  QUILL_CATCH(std::exception const& e) { _notify_error(_options.error_notifier, e.what()); }
1968  QUILL_CATCH_ALL()
1969  {
1970  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
1971  }
1972 #endif
1973  }
1974  }
1975  }
1976  else if (event == MacroMetadata::Event::MdcClear)
1977  {
1978  thread_context._backend_mdc_state.reset();
1979  }
1980  else
1981  {
1982  QUILL_ASSERT(false, "Unexpected MDC event in BackendWorker::_apply_mdc_event()");
1983  }
1984  }
1985 
1986  void _set_transit_event_mdc(ThreadContext const& thread_context, TransitEvent* transit_event)
1987  {
1988  if (thread_context._backend_mdc_state)
1989  {
1990  std::string_view const mdc = thread_context._backend_mdc_state->formatted_mdc();
1991  transit_event->ensure_extra_data();
1992  transit_event->extra_data->mdc.assign(mdc.data(), mdc.size());
1993  }
1994  }
1995 
1997  static void _format_and_split_arguments(std::vector<std::pair<std::string, std::string>> const& orig_arg_names,
1998  std::vector<std::pair<std::string, std::string>>& named_args,
1999  DynamicFormatArgStore const& format_args_store,
2000  BackendOptions const& options)
2001  {
2002  for (size_t i = 0; i < named_args.size(); ++i)
2003  {
2004  if (i >= static_cast<size_t>(format_args_store.size()))
2005  {
2006  break;
2007  }
2008 
2009  std::string format_string{"{}"};
2010 
2011  // orig_arg_names[i].second stores the normalized format syntax, for example ":.2f" or ":{}".
2012  if ((i < orig_arg_names.size()) && !orig_arg_names[i].second.empty())
2013  {
2014  format_string = "{";
2015  format_string += orig_arg_names[i].second;
2016  format_string += "}";
2017  }
2018 
2019  fmtquill::vformat_to(std::back_inserter(named_args[i].second), format_string,
2020  fmtquill::basic_format_args<fmtquill::format_context>{
2021  format_args_store.data() + static_cast<std::ptrdiff_t>(i),
2022  format_args_store.size() - static_cast<int>(i)});
2023 
2024  if (options.check_printable_char && format_args_store.has_string_related_type())
2025  {
2026  sanitize_non_printable_chars(named_args[i].second, options);
2027  }
2028  }
2029  }
2030 
2031  void _populate_formatted_named_args(TransitEvent* transit_event,
2032  std::vector<std::pair<std::string, std::string>> const& arg_names)
2033  {
2034  transit_event->ensure_extra_data();
2035 
2036  auto* named_args = &transit_event->extra_data->named_args;
2037  named_args->clear();
2038 
2039  if (arg_names.empty())
2040  {
2041  return;
2042  }
2043 
2044  named_args->resize(arg_names.size());
2045 
2046  // We first populate the arg names in the transit buffer
2047  for (size_t i = 0; i < arg_names.size(); ++i)
2048  {
2049  (*named_args)[i].first = arg_names[i].first;
2050  }
2051 
2052  for (size_t i = arg_names.size(); i < static_cast<size_t>(_format_args_store.size()); ++i)
2053  {
2054  // we do not have a named_arg for the argument value here so we just append its index as a placeholder
2055  named_args->push_back(std::pair<std::string, std::string>(fmtquill::format("_{}", i), std::string{}));
2056  }
2057 
2058  // Then populate all the values of each arg
2059  QUILL_TRY { _format_and_split_arguments(arg_names, *named_args, _format_args_store, _options); }
2060 #if !defined(QUILL_NO_EXCEPTIONS)
2061  QUILL_CATCH(std::exception const&)
2062  {
2063  // This catch block simply catches the exception.
2064  // Since the error has already been handled in _populate_formatted_log_message,
2065  // there is no additional action required here.
2066  }
2067  QUILL_CATCH_ALL()
2068  {
2069  _notify_error(_options.error_notifier, std::string{"Caught unhandled exception."});
2070  }
2071 #endif
2072  }
2073 
2074  QUILL_ATTRIBUTE_HOT void _populate_formatted_log_message(TransitEvent* transit_event, char const* message_format)
2075  {
2076  transit_event->formatted_msg->clear();
2077 
2078  QUILL_TRY
2079  {
2080  fmtquill::vformat_to(std::back_inserter(*transit_event->formatted_msg), message_format,
2081  fmtquill::basic_format_args<fmtquill::format_context>{
2082  _format_args_store.data(), _format_args_store.size()});
2083 
2084  if (_options.check_printable_char && _format_args_store.has_string_related_type())
2085  {
2086  sanitize_non_printable_chars(*transit_event->formatted_msg, _options);
2087  }
2088  }
2089 #if !defined(QUILL_NO_EXCEPTIONS)
2090  QUILL_CATCH(std::exception const& e)
2091  {
2092  transit_event->formatted_msg->clear();
2093  std::string const error =
2094  fmtquill::format(R"([Could not format log statement. message: "{}", location: "{}", error: "{}"])",
2095  transit_event->macro_metadata->message_format(),
2096  transit_event->macro_metadata->short_source_location(), e.what());
2097 
2098  transit_event->formatted_msg->append(error);
2099  _notify_error(_options.error_notifier, error);
2100  }
2101  QUILL_CATCH_ALL()
2102  {
2103  transit_event->formatted_msg->clear();
2104  std::string const error = fmtquill::format(
2105  R"([Could not format log statement. message: "{}", location: "{}", error: "{}"])",
2106  transit_event->macro_metadata->message_format(),
2107  transit_event->macro_metadata->short_source_location(), "Caught unhandled exception.");
2108 
2109  transit_event->formatted_msg->append(error);
2110  _notify_error(_options.error_notifier, error);
2111  }
2112 #endif
2113  }
2114 
2115  void _apply_runtime_metadata(std::byte*& read_pos, TransitEvent* transit_event)
2116  {
2117  char const* fmt;
2118  char const* file;
2119  char const* function;
2120  char const* tags;
2121 
2122  if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy)
2123  {
2124  fmt = Codec<char const*>::decode_arg(read_pos);
2125  file = Codec<char const*>::decode_arg(read_pos);
2126  function = Codec<char const*>::decode_arg(read_pos);
2127  tags = Codec<char const*>::decode_arg(read_pos);
2128  }
2129  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy)
2130  {
2131  fmt = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2132  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2133  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2134  tags = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2135  }
2136  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy)
2137  {
2138  fmt = Codec<char const*>::decode_arg(read_pos);
2139  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2140  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
2141  tags = Codec<char const*>::decode_arg(read_pos);
2142  }
2143  else
2144  {
2145  QUILL_THROW(
2146  QuillError{"Unexpected event type in _apply_runtime_metadata. This should never happen."});
2147  }
2148 
2149  auto const line = Codec<uint32_t>::decode_arg(read_pos);
2150  auto const log_level = Codec<LogLevel>::decode_arg(read_pos);
2151 
2152  auto temp = TransitEvent::RuntimeMetadata{file, line, function, tags, fmt, log_level};
2153 
2154  transit_event->ensure_extra_data();
2155  transit_event->extra_data->runtime_metadata = temp;
2156 
2157  // point to the runtime metadata
2158  transit_event->macro_metadata = &transit_event->extra_data->runtime_metadata.macro_metadata;
2159  }
2160 
2161  template <typename TFormattedMsg>
2162  static void sanitize_non_printable_chars(TFormattedMsg& formatted_msg, BackendOptions const& options)
2163  {
2164  // check for non-printable characters in the formatted_msg
2165  bool contains_non_printable_char{false};
2166 
2167  for (char c : formatted_msg)
2168  {
2169  if (!options.check_printable_char(c))
2170  {
2171  contains_non_printable_char = true;
2172  break;
2173  }
2174  }
2175 
2176  if (contains_non_printable_char)
2177  {
2178  // in this rare event we will replace the non-printable chars with their hex values
2179  std::string const formatted_msg_copy = {formatted_msg.data(), formatted_msg.size()};
2180  formatted_msg.clear();
2181 
2182  for (char c : formatted_msg_copy)
2183  {
2184  if (options.check_printable_char(c))
2185  {
2186  formatted_msg.push_back(c);
2187  }
2188  else
2189  {
2190  // convert non-printable character to hex
2191  unsigned char const byte = static_cast<unsigned char>(c);
2192  constexpr char hex[] = "0123456789ABCDEF";
2193  formatted_msg.push_back('\\');
2194  formatted_msg.push_back('x');
2195  formatted_msg.push_back(hex[(byte >> 4) & 0xF]);
2196  formatted_msg.push_back(hex[byte & 0xF]);
2197  }
2198  }
2199  }
2200  }
2201 
2202 private:
2203  friend class quill::ManualBackendWorker;
2204 
2205  std::unique_ptr<BackendWorkerLock> _backend_worker_lock;
2206  ThreadContextManager& _thread_context_manager = ThreadContextManager::instance();
2207  SinkManager& _sink_manager = SinkManager::instance();
2208  LoggerManager& _logger_manager = LoggerManager::instance();
2209  BackendOptions _options;
2210  uint64_t _last_output_timestamp{0};
2211  std::thread _worker_thread;
2212 
2213  DynamicFormatArgStore _format_args_store;
2214  std::vector<std::string> _removed_loggers;
2215  std::vector<ThreadContext*> _active_thread_contexts_cache;
2216  std::vector<Sink*> _active_sinks_cache;
2217  std::vector<std::pair<std::string, std::string>> _mdc_fields;
2218  std::vector<std::string> _mdc_keys;
2219  std::unordered_map<std::string, std::pair<std::string, std::vector<std::pair<std::string, std::string>>>> _named_args_templates;
2220  std::unordered_map<std::string, std::atomic<bool>*> _logger_removal_flags;
2221  std::string _named_args_format_template;
2222  std::string _process_id;
2223  std::chrono::steady_clock::time_point _last_rdtsc_resync_time;
2224  std::chrono::steady_clock::time_point _last_sink_flush_time;
2225  std::atomic<uint32_t> _worker_thread_id{0};
2226  std::atomic<bool> _is_worker_running{false};
2227  std::atomic<bool> _has_worker_thread_exited{true};
2229  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<RdtscClock*> _rdtsc_clock{
2230  nullptr};
2231  alignas(QUILL_CACHE_LINE_ALIGNED) std::mutex _wake_up_mutex;
2232  std::condition_variable _wake_up_cv;
2233  bool _wake_up_flag{false};
2234 };
2235 
2236 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
2237  #pragma warning(pop)
2238 #endif
2239 
2240 } // namespace detail
2241 
2242 QUILL_END_NAMESPACE
QUILL_ATTRIBUTE_HOT void sleep_for_ns(uint64_t ns) noexcept
Mirrors std::this_thread::sleep_for(std::chrono::nanoseconds{ns}).
Definition: ThreadPrimitives.h:43
std::unique_ptr< ExtraData > extra_data
buffer for message
Definition: TransitEvent.h:271
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:43
void clear()
Erase all elements from the store.
Definition: DynamicFormatArgStore.h:149
void notify()
Wakes up the backend worker thread.
Definition: BackendWorker.h:300
std::function< void()> backend_worker_on_poll_end
Optional hook executed by the backend worker thread at the end of each poll iteration.
Definition: BackendOptions.h:257
Base class for sinks.
Definition: Sink.h:46
QUILL_ATTRIBUTE_COLD void run(BackendOptions const &options)
Starts the backend worker thread.
Definition: BackendWorker.h:141
bool enable_yield_when_idle
The backend employs "busy-waiting" by spinning around each frontend thread&#39;s queue.
Definition: BackendOptions.h:67
Definition: TransitEvent.h:33
~BackendWorker()
Destructor.
Definition: BackendWorker.h:97
Definition: TransitEvent.h:183
QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
Access the rdtsc class from any thread to convert an rdtsc value to wall time.
Definition: BackendWorker.h:115
Captures and stores information about a logging event in compile time.
Definition: MacroMetadata.h:24
Definition: ThreadContextManager.h:223
void(*)(std::byte *&data, DynamicFormatArgStore &args_store) FormatArgsDecoder
Decode functions.
Definition: Codec.h:415
Definition: BackendMdcState.h:21
std::function< void(std::string const &)> error_notifier
The backend may encounter exceptions that cannot be caught within user threads.
Definition: BackendOptions.h:241
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:150
std::chrono::nanoseconds sleep_duration
Specifies the duration the backend sleeps if there is no remaining work to process in the queues...
Definition: BackendOptions.h:72
Similar to fmt::dynamic_arg_store but better suited to our needs e.g.
Definition: DynamicFormatArgStore.h:79
tm * localtime_rs(time_t const *timer, tm *buf)
Portable localtime_r or _s per operating system.
Definition: TimeUtilities.h:56
Definition: LogFunctions.h:269
QUILL_NODISCARD bool is_valid_logger() const noexcept
Checks if the logger is valid.
Definition: LoggerBase.h:115
void for_each_logger(TCallback cb) const
For backend use only.
Definition: LoggerManager.h:141
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:28
BackendWorker()=default
Constructor.
Codec contract notes (apply to this primary template and every specialization, including those in the...
Definition: Codec.h:168
Converts tsc ticks to nanoseconds since epoch.
Definition: RdtscClock.h:36
MetricMetadata extends MacroMetadata so the existing header-encoding path can carry it in the macro_m...
Definition: Metric.h:35
Definition: base.h:2200
size_t transit_events_hard_limit
The backend gives priority to reading messages from the frontend queues and temporarily buffers them...
Definition: BackendOptions.h:115
QUILL_ATTRIBUTE_COLD void stop() noexcept
Stops the backend worker thread.
Definition: BackendWorker.h:248
std::array< std::string, LogLevelCount > log_level_descriptions
Holds descriptive names for various log levels used in logging operations.
Definition: BackendOptions.h:311
This class can be used when you want to run the backend worker on your own thread.
Definition: ManualBackendWorker.h:30
Definition: LoggerBase.h:39
custom exception
Definition: QuillError.h:47
QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
Get the backend worker&#39;s thread id.
Definition: BackendWorker.h:132
bool ensure_monotonic_output_timestamps
Ensures sink-visible timestamps for regular log and metric records do not move backwards.
Definition: BackendOptions.h:189
std::string mdc_format_pattern
Format string used when rendering PatternFormatter&#39;s %(mdc).
Definition: BackendOptions.h:340
bool wait_for_queues_to_empty_before_exit
When this option is enabled and the application is terminating, the backend worker thread will not ex...
Definition: BackendOptions.h:206
size_t transit_events_soft_limit
The backend gives priority to reading messages from the frontend queues of all the hot threads and te...
Definition: BackendOptions.h:98
QUILL_ATTRIBUTE_HOT void yield_thread() noexcept
Mirrors std::this_thread::yield().
Definition: ThreadPrimitives.h:71
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer.
Definition: UnboundedSPSCQueue.h:209
Definition: SinkManager.h:34
bool check_backend_singleton_instance
Enables a runtime check to detect multiple instances of the backend singleton.
Definition: BackendOptions.h:362
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT uint64_t get_system_time_ns() noexcept
Mirrors std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::system_clock::now().time_since_epoch()).count().
Definition: ChronoTimeUtils.h:57
std::string thread_name
The name assigned to the backend, visible during thread naming queries (e.g., pthread_getname_np) or ...
Definition: BackendOptions.h:59
std::function< void()> backend_worker_on_poll_begin
Optional hook executed by the backend worker thread at the start of each poll iteration.
Definition: BackendOptions.h:249
std::vector< uint16_t > cpu_affinity
Pins the backend to the specified CPUs.
Definition: BackendOptions.h:220
Definition: BackendWorker.h:80
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:213
Definition: LoggerManager.h:39
Configuration options for the backend.
Definition: BackendOptions.h:51
std::chrono::microseconds log_timestamp_ordering_grace_period
The backend iterates through all frontend lock-free queues and pops all messages from each queue...
Definition: BackendOptions.h:165
std::array< std::string, LogLevelCount > log_level_short_codes
Short codes or identifiers for each log level.
Definition: BackendOptions.h:321
std::function< bool(char c)> check_printable_char
This option enables a check that verifies the log message contains only printable characters before f...
Definition: BackendOptions.h:304
Definition: ThreadContextManager.h:54
std::chrono::milliseconds rdtsc_resync_interval
This option is only applicable if at least one frontend is using a Logger with ClockSourceType::Tsc.
Definition: BackendOptions.h:272
std::chrono::milliseconds sink_min_flush_interval
This option specifies the minimum time interval (in milliseconds) before the backend thread flushes t...
Definition: BackendOptions.h:289