Program Listing for File batch_span_processor.h

Return to documentation for file (/tmp/B.puc0r6hi/BUILD/opentelemetry-cpp-1.27.0-build/opentelemetry-cpp-1.27.0/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h)

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include <stddef.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/trace/batch_span_processor_options.h"
#include "opentelemetry/sdk/trace/batch_span_processor_runtime_options.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/trace/span_context.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{

class BatchSpanProcessor : public SpanProcessor
{
public:
  BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
                     const BatchSpanProcessorOptions &options);

  BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
                     const BatchSpanProcessorOptions &options,
                     const BatchSpanProcessorRuntimeOptions &runtime_options);

  BatchSpanProcessor(const BatchSpanProcessor &)            = delete;
  BatchSpanProcessor(BatchSpanProcessor &&)                 = delete;
  BatchSpanProcessor &operator=(const BatchSpanProcessor &) = delete;
  BatchSpanProcessor &operator=(BatchSpanProcessor &&)      = delete;

  std::unique_ptr<Recordable> MakeRecordable() noexcept override;

  void OnStart(Recordable &span,
               const opentelemetry::trace::SpanContext &parent_context) noexcept override;

  void OnEnd(std::unique_ptr<Recordable> &&span) noexcept override;

  bool ForceFlush(
      std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;

  bool Shutdown(
      std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;

  ~BatchSpanProcessor() override;

protected:
  bool InternalShutdown(
      std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

  void DoBackgroundWork();

  virtual void Export();

  void DrainQueue();

  struct SynchronizationData
  {
    /* Synchronization primitives */
    std::condition_variable cv, force_flush_cv;
    std::mutex cv_m, force_flush_cv_m, shutdown_m;

    /* Important boolean flags to handle the workflow of the processor */
    std::atomic<bool> is_force_wakeup_background_worker{false};
    std::atomic<bool> is_shutdown{false};
    std::atomic<uint64_t> force_flush_pending_sequence{0};
    std::atomic<uint64_t> force_flush_notified_sequence{0};
    std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};

    // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
    // and may not initialize the member correctly. See also
    // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
    inline SynchronizationData() {}
  };

  static void NotifyCompletion(uint64_t notify_force_flush,
                               const std::unique_ptr<SpanExporter> &exporter,
                               const std::shared_ptr<SynchronizationData> &synchronization_data);

  void GetWaitAdjustedTime(std::chrono::microseconds &timeout,
                           std::chrono::time_point<std::chrono::system_clock> &start_time);
  /* The configured backend exporter */
  std::unique_ptr<SpanExporter> exporter_;

  /* Configurable parameters as per the official specs */
  const size_t max_queue_size_;
  const std::chrono::milliseconds schedule_delay_millis_;
  const size_t max_export_batch_size_;

  /* The buffer/queue to which the ended spans are added */
  opentelemetry::sdk::common::CircularBuffer<Recordable> buffer_;

  std::shared_ptr<SynchronizationData> synchronization_data_;

  /* The background worker thread */
  std::shared_ptr<sdk::common::ThreadInstrumentation> worker_thread_instrumentation_;
  std::thread worker_thread_;
};

}  // namespace trace
}  // namespace sdk
OPENTELEMETRY_END_NAMESPACE