diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 37950b67..5cc855d7 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -29,13 +29,18 @@ ) from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer +from opentelemetry import trace as trace_api +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + APP_NAME = "app" USER_ID = "testuser" SESSION_ID = "testsession" - -@pytest.mark.asyncio -async def test_tracing(): +def init_exporters(): cozeloop_exporter = CozeloopExporter( config=CozeloopExporterConfig( endpoint="http://localhost:8000", @@ -61,8 +66,18 @@ async def test_tracing(): secret_key="test_secret_key", ) ) + return [cozeloop_exporter, apmplus_exporter, tls_exporter] - exporters = [cozeloop_exporter, apmplus_exporter, tls_exporter] +def gen_span_processor(endpoint: str): + otlp_exporter = OTLPSpanExporter( + endpoint=endpoint, + ) + span_processor = BatchSpanProcessor(otlp_exporter) + return span_processor + +@pytest.mark.asyncio +async def test_tracing(): + exporters = init_exporters() tracer = OpentelemetryTracer(exporters=exporters) assert len(tracer.exporters) == 5 # with extra 2 built-in exporters @@ -70,3 +85,33 @@ async def test_tracing(): # TODO: Ensure the tracing provider is set correctly after loading SDK # TODO: Ensure the tracing provider is set correctly after loading SDK # TODO: Ensure the tracing provider is set correctly after loading SDK + +@pytest.mark.asyncio +async def test_tracing_with_global_provider(): + exporters = init_exporters() + # set global tracer provider before init OpentelemetryTracer + trace_api.set_tracer_provider(trace_sdk.TracerProvider()) + tracer_provider = trace_api.get_tracer_provider() + tracer_provider.add_span_processor(gen_span_processor("http://localhost:8000")) + trace_api.set_tracer_provider(tracer_provider) + # + tracer = OpentelemetryTracer(exporters=exporters) + + assert len(tracer.exporters) == 5 # with extra 2 built-in exporters + + +@pytest.mark.asyncio +async def test_tracing_with_apmplus_global_provider(): + exporters = init_exporters() + # add apmplus exporter to global tracer provider before init OpentelemetryTracer + trace_api.set_tracer_provider(trace_sdk.TracerProvider()) + tracer_provider = trace_api.get_tracer_provider() + tracer_provider.add_span_processor(gen_span_processor("http://apmplus-region.com")) + + # init OpentelemetryTracer + tracer = OpentelemetryTracer(exporters=exporters) + + # apmplus exporter won't init again + assert len(tracer.exporters) == 4 # with extra 2 built-in exporters + + diff --git a/veadk/tracing/telemetry/opentelemetry_tracer.py b/veadk/tracing/telemetry/opentelemetry_tracer.py index a1b1da1c..f3a0e1e7 100644 --- a/veadk/tracing/telemetry/opentelemetry_tracer.py +++ b/veadk/tracing/telemetry/opentelemetry_tracer.py @@ -20,13 +20,17 @@ from openinference.instrumentation.google_adk import GoogleADKInstrumentor from opentelemetry.sdk import trace as trace_sdk +from opentelemetry import trace as trace_api from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor + from pydantic import BaseModel, ConfigDict, Field from typing_extensions import override from veadk.tracing.base_tracer import BaseTracer from veadk.tracing.telemetry.exporters.apiserver_exporter import ApiServerExporter +from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter from veadk.tracing.telemetry.metrics.opentelemetry_metrics import MeterUploader @@ -60,7 +64,6 @@ def model_post_init(self, context: Any, /) -> None: self._processors = [] self._inmemory_exporter: InMemoryExporter = None self._apiserver_exporter: ApiServerExporter = None - # Inmemory & APIServer are the default exporters have_inmemory_exporter = False have_apiserver_exporter = False @@ -81,17 +84,6 @@ def model_post_init(self, context: Any, /) -> None: self.exporters.append(apiserver_exporter) self._apiserver_exporter = apiserver_exporter - tracer_provider = trace_sdk.TracerProvider() - for exporter in self.exporters: - processor, resource_attributes = exporter.get_processor() - if resource_attributes is not None: - update_resource_attributions(tracer_provider, resource_attributes) - tracer_provider.add_span_processor(processor) - - logger.debug(f"Add exporter `{exporter.__class__.__name__}` to tracing.") - self._processors.append(processor) - logger.debug(f"Init OpentelemetryTracer with {len(self.exporters)} exporters.") - self._meter_contexts = [] self._meter_uploaders = [] for exporter in self.exporters: @@ -103,13 +95,55 @@ def model_post_init(self, context: Any, /) -> None: meter_uploader = MeterUploader(meter_context) self._meter_uploaders.append(meter_uploader) + # init tracer provider + self._init_tracer_provider() + # just for debug self._trace_file_path = "" # patch this before starting instrumentation # enable_veadk_tracing(self.dump) - GoogleADKInstrumentor().instrument(tracer_provider=tracer_provider) + GoogleADKInstrumentor().instrument() + + def _init_tracer_provider(self): + # 1. get global trace provider + global_tracer_provider = trace_api.get_tracer_provider() + + if not isinstance(global_tracer_provider, TracerProvider): + logger.info( + f"Global tracer provider has not been set. Create tracer provider and set it now." + ) + # 1.1 init tracer provider + tracer_provider = trace_sdk.TracerProvider() + trace_api.set_tracer_provider(tracer_provider) + global_tracer_provider = trace_api.get_tracer_provider() + + have_apmplus_exporter = False + # 2. check if apmplus exporter is already exist + for processor in global_tracer_provider._active_span_processor._span_processors: + if isinstance(processor, (BatchSpanProcessor, SimpleSpanProcessor)): + # check exporter endpoint + if "apmplus" in processor.span_exporter._endpoint: + have_apmplus_exporter = True + + # 3. add exporters to global tracer_provider + # range over a copy of exporters to avoid index issues + for exporter in self.exporters[:]: + if have_apmplus_exporter and isinstance(exporter, APMPlusExporter): + # apmplus exporter has been int in global tracer provider, need to remove from exporters. + self.exporters.remove(exporter) + continue + processor, resource_attributes = exporter.get_processor() + if resource_attributes is not None: + update_resource_attributions( + global_tracer_provider, resource_attributes + ) + global_tracer_provider.add_span_processor(processor) + logger.debug(f"Add exporter `{exporter.__class__.__name__}` to tracing.") + self._processors.append(processor) + logger.debug(f"Init OpentelemetryTracer with {len(self.exporters)} exporters.") + @override def dump(