Program Listing for File batch_log_record_processor.h
↰ Return to documentation for file (/tmp/B.puc0r6hi/BUILD/opentelemetry-cpp-1.27.0-build/opentelemetry-cpp-1.27.0/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h)
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
#pragma once
#include <stddef.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/logs/batch_log_record_processor_options.h"
#include "opentelemetry/sdk/logs/batch_log_record_processor_runtime_options.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/processor.h"
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/version.h"
OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
class BatchLogRecordProcessor : public LogRecordProcessor
{
public:
explicit BatchLogRecordProcessor(
std::unique_ptr<LogRecordExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512);
explicit BatchLogRecordProcessor(std::unique_ptr<LogRecordExporter> &&exporter,
const BatchLogRecordProcessorOptions &options);
explicit BatchLogRecordProcessor(std::unique_ptr<LogRecordExporter> &&exporter,
const BatchLogRecordProcessorOptions &options,
const BatchLogRecordProcessorRuntimeOptions &runtime_options);
BatchLogRecordProcessor(const BatchLogRecordProcessor &) = delete;
BatchLogRecordProcessor(BatchLogRecordProcessor &&) = delete;
BatchLogRecordProcessor &operator=(const BatchLogRecordProcessor &) = delete;
BatchLogRecordProcessor &operator=(BatchLogRecordProcessor &&) = delete;
std::unique_ptr<Recordable> MakeRecordable() noexcept override;
void OnEmit(std::unique_ptr<Recordable> &&record) noexcept override;
bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
~BatchLogRecordProcessor() override;
protected:
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;
void DoBackgroundWork();
virtual void Export();
void DrainQueue();
struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv, force_flush_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m;
/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_shutdown{false};
std::atomic<uint64_t> force_flush_pending_sequence{0};
std::atomic<uint64_t> force_flush_notified_sequence{0};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
// and may not initialize the member correctly. See also
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
inline SynchronizationData() {}
};
static void NotifyCompletion(uint64_t notify_force_flush,
const std::unique_ptr<LogRecordExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data);
void GetWaitAdjustedTime(std::chrono::microseconds &timeout,
std::chrono::time_point<std::chrono::system_clock> &start_time);
/* The configured backend log exporter */
std::unique_ptr<LogRecordExporter> exporter_;
/* Configurable parameters as per the official *trace* specs */
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;
/* The buffer/queue to which the ended logs are added */
opentelemetry::sdk::common::CircularBuffer<Recordable> buffer_;
std::shared_ptr<SynchronizationData> synchronization_data_;
/* The background worker thread */
std::shared_ptr<sdk::common::ThreadInstrumentation> worker_thread_instrumentation_;
std::thread worker_thread_;
};
} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE