Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ async def test_tracing():
exporters = init_exporters()
tracer = OpentelemetryTracer(exporters=exporters)

assert len(tracer.exporters) == 5 # with extra 2 built-in exporters
assert len(tracer.exporters) == 4 # with extra 1 built-in exporters

# TODO: Ensure the tracing provider is set correctly after loading SDK
# TODO: Ensure the tracing provider is set correctly after loading SDK
# TODO: Ensure the tracing provider is set correctly after loading SDK


@pytest.mark.asyncio
Expand All @@ -100,7 +98,7 @@ async def test_tracing_with_global_provider():
#
tracer = OpentelemetryTracer(exporters=exporters)

assert len(tracer.exporters) == 5 # with extra 2 built-in exporters
assert len(tracer.exporters) == 4 # with extra 1 built-in exporters


@pytest.mark.asyncio
Expand All @@ -115,4 +113,4 @@ async def test_tracing_with_apmplus_global_provider():
tracer = OpentelemetryTracer(exporters=exporters)

# apmplus exporter won't init again
assert len(tracer.exporters) == 4 # with extra 2 built-in exporters
assert len(tracer.exporters) == 3 # with extra 1 built-in exporters
6 changes: 2 additions & 4 deletions veadk/evaluation/deepeval_evaluator/deepeval_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
"https://ark.cn-beijing.volces.com/api/v3/",
),
name: str = "veadk_deepeval_evaluator",
prometheus_config: PrometheusPushgatewayConfig = None,
prometheus_config: PrometheusPushgatewayConfig | None = None,
):
super().__init__(agent=agent, name=name)

Expand All @@ -73,11 +73,9 @@ async def eval(
eval_id: str = f"test_{formatted_timestamp()}",
):
"""Target to Google ADK, we will use the same evaluation case format as Google ADK."""
for metric in metrics:
if not metric.model:
metric.model = self.judge_model
# Get evaluation data by parsing eval set file
self.generate_eval_data(eval_set_file_path)

# Get actual data by running agent
logger.info("Start to run agent for actual data.")
await self._run_agent_for_actual_data()
Expand Down
3 changes: 0 additions & 3 deletions veadk/tracing/telemetry/exporters/inmemory_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,3 @@ def get_processor(self):
self._real_exporter = self._exporter
processor = export.SimpleSpanProcessor(self._exporter)
return processor, None

def get_meter_context(self) -> Any:
return None
62 changes: 33 additions & 29 deletions veadk/tracing/telemetry/opentelemetry_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,19 @@ class OpentelemetryTracer(BaseModel, BaseTracer):

def model_post_init(self, context: Any, /) -> None:
self._processors = []
self._inmemory_exporter: InMemoryExporter = None
self._apiserver_exporter: ApiServerExporter = None
# Inmemory & APIServer are the default exporters
have_inmemory_exporter = False
have_apiserver_exporter = False
self._inmemory_exporter: InMemoryExporter | None = None

# InMemoryExporter is a default exporter for exporting local tracing file
for exporter in self.exporters:
if isinstance(exporter, InMemoryExporter):
have_inmemory_exporter = True
self._inmemory_exporter = exporter
elif isinstance(exporter, ApiServerExporter):
have_apiserver_exporter = True
self._apiserver_exporter = exporter

if not have_inmemory_exporter:
inmemory_exporter = InMemoryExporter()
self.exporters.append(inmemory_exporter)
self._inmemory_exporter = inmemory_exporter
if not have_apiserver_exporter:
apiserver_exporter = ApiServerExporter()
self.exporters.append(apiserver_exporter)
self._apiserver_exporter = apiserver_exporter

if self._inmemory_exporter is None:
self._inmemory_exporter = InMemoryExporter()
self.exporters.append(self._inmemory_exporter)
# ========================================================================

# Process meter-related attributes
self._meter_contexts = []
self._meter_uploaders = []
for exporter in self.exporters:
Expand All @@ -93,19 +84,18 @@ def model_post_init(self, context: Any, /) -> None:
for meter_context in self._meter_contexts:
meter_uploader = MeterUploader(meter_context)
self._meter_uploaders.append(meter_uploader)
# ================================

# init tracer provider
# VeADK operates on global OpenTelemetry provider, hence return nothing
self._init_tracer_provider()

# just for debug
self._trace_file_path = ""

# patch this before starting instrumentation
# enable_veadk_tracing(self.dump)

GoogleADKInstrumentor().instrument()

def _init_tracer_provider(self):
def _init_tracer_provider(self) -> None:
# 1. get global trace provider
global_tracer_provider = trace_api.get_tracer_provider()

Expand All @@ -122,17 +112,25 @@ def _init_tracer_provider(self):
# 2. check if apmplus exporter is already exist
for processor in global_tracer_provider._active_span_processor._span_processors:
if isinstance(processor, (BatchSpanProcessor, SimpleSpanProcessor)):
# check exporter endpoint
if "apmplus" in processor.span_exporter._endpoint:
have_apmplus_exporter = True
# try to get endpoint, in case of exporter has no _endpoint attribute
try:
exporter_endpoint = processor.span_exporter._endpoint
if "apmplus" in exporter_endpoint:
have_apmplus_exporter = True
except AttributeError:
# log a warning and pass this exporter
logger.warning(
f"Exporter {processor.span_exporter} has no endpoint."
)

# 3. add exporters to global tracer_provider
# range over a copy of exporters to avoid index issues
if have_apmplus_exporter:
self.exporters = [
e for e in self.exporters if not isinstance(e, APMPlusExporter)
]

for exporter in self.exporters[:]:
if have_apmplus_exporter and isinstance(exporter, APMPlusExporter):
# apmplus exporter has been int in global tracer provider, need to remove from exporters.
self.exporters.remove(exporter)
continue
processor, resource_attributes = exporter.get_processor()
if resource_attributes is not None:
update_resource_attributions(
Expand All @@ -150,6 +148,12 @@ def dump(
session_id: str,
path: str = "/tmp",
) -> str:
if not self._inmemory_exporter:
logger.warning(
"InMemoryExporter is not initialized. Please check your tracer exporters."
)
return ""

prompt_tokens = self._inmemory_exporter._real_exporter.prompt_tokens
completion_tokens = self._inmemory_exporter._real_exporter.completion_tokens

Expand Down