diff --git a/tests/test_agent.py b/tests/test_agent.py index 622e5caa..2843d03e 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -48,7 +48,3 @@ def test_agent(): assert agent.long_term_memory.backend == "local" assert load_memory in agent.tools - - assert tracer.tracer_hook_before_model in agent.before_model_callback - assert tracer.tracer_hook_after_model in agent.after_model_callback - assert tracer.tracer_hook_after_tool in agent.after_tool_callback diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 635cfd45..333945d4 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -82,7 +82,7 @@ async def test_tracing(): exporters = init_exporters() tracer = OpentelemetryTracer(exporters=exporters) - assert len(tracer.exporters) == 4 # with extra 1 built-in exporters + assert len(tracer.exporters) == 3 # TODO: Ensure the tracing provider is set correctly after loading SDK @@ -98,7 +98,7 @@ async def test_tracing_with_global_provider(): # tracer = OpentelemetryTracer(exporters=exporters) - assert len(tracer.exporters) == 4 # with extra 1 built-in exporters + assert len(tracer.exporters) == 3 # with extra 1 built-in exporters @pytest.mark.asyncio @@ -113,4 +113,4 @@ async def test_tracing_with_apmplus_global_provider(): tracer = OpentelemetryTracer(exporters=exporters) # apmplus exporter won't init again - assert len(tracer.exporters) == 3 # with extra 1 built-in exporters + assert len(tracer.exporters) == 2 # with extra 1 built-in exporters diff --git a/veadk/agent.py b/veadk/agent.py index 8a8f756d..0da0d749 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -112,10 +112,6 @@ def model_post_init(self, __context: Any) -> None: self.tools.append(load_memory) - if self.tracers: - for tracer in self.tracers: - tracer.do_hooks(self) - logger.info(f"{self.__class__.__name__} `{self.name}` init done.") logger.debug( f"Agent: {self.model_dump(include={'name', 'model_name', 'model_api_base', 'tools', 'serve_url'})}" @@ -215,9 +211,6 @@ async def run( session_service=session_service, memory_service=self.long_term_memory, ) - if getattr(self, "tracers", None): - for tracer in self.tracers: - tracer.set_app_name(app_name) logger.info(f"Begin to process prompt {prompt}") # run diff --git a/veadk/agents/loop_agent.py b/veadk/agents/loop_agent.py index b7bbde3c..33d2494e 100644 --- a/veadk/agents/loop_agent.py +++ b/veadk/agents/loop_agent.py @@ -19,7 +19,6 @@ from pydantic import ConfigDict, Field from typing_extensions import Any -from veadk.agent import Agent from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -50,21 +49,7 @@ class LoopAgent(GoogleADKLoopAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" - def set_sub_agents_tracer(self, tracer) -> None: - from veadk.agents.parallel_agent import ParallelAgent - from veadk.agents.sequential_agent import SequentialAgent - - for sub_agent in self.sub_agents: - if isinstance(sub_agent, Agent): - tracer.do_hooks(sub_agent) - elif isinstance(sub_agent, (SequentialAgent, LoopAgent, ParallelAgent)): - sub_agent.set_sub_agents_tracer(tracer) - def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init - if self.tracers: - for tracer in self.tracers: - self.set_sub_agents_tracer(tracer) - logger.info(f"{self.__class__.__name__} `{self.name}` init done.") diff --git a/veadk/agents/parallel_agent.py b/veadk/agents/parallel_agent.py index 486c138a..d13b3a95 100644 --- a/veadk/agents/parallel_agent.py +++ b/veadk/agents/parallel_agent.py @@ -19,7 +19,6 @@ from pydantic import ConfigDict, Field from typing_extensions import Any -from veadk.agent import Agent from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -50,16 +49,6 @@ class ParallelAgent(GoogleADKParallelAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" - def set_sub_agents_tracer(self, tracer) -> None: - from veadk.agents.loop_agent import LoopAgent - from veadk.agents.sequential_agent import SequentialAgent - - for sub_agent in self.sub_agents: - if isinstance(sub_agent, Agent): - tracer.do_hooks(sub_agent) - elif isinstance(sub_agent, (SequentialAgent, LoopAgent, ParallelAgent)): - sub_agent.set_sub_agents_tracer(tracer) - def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init @@ -67,7 +56,5 @@ def model_post_init(self, __context: Any) -> None: logger.warning( "Enable tracing in ParallelAgent may cause OpenTelemetry context error. Issue see https://github.com/google/adk-python/issues/1670" ) - for tracer in self.tracers: - self.set_sub_agents_tracer(tracer) logger.info(f"{self.__class__.__name__} `{self.name}` init done.") diff --git a/veadk/agents/sequential_agent.py b/veadk/agents/sequential_agent.py index b7ac4b1a..cf4f5146 100644 --- a/veadk/agents/sequential_agent.py +++ b/veadk/agents/sequential_agent.py @@ -19,7 +19,6 @@ from pydantic import ConfigDict, Field from typing_extensions import Any -from veadk.agent import Agent from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -50,21 +49,7 @@ class SequentialAgent(GoogleADKSequentialAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" - def set_sub_agents_tracer(self, tracer) -> None: - from veadk.agents.loop_agent import LoopAgent - from veadk.agents.parallel_agent import ParallelAgent - - for sub_agent in self.sub_agents: - if isinstance(sub_agent, Agent): - tracer.do_hooks(sub_agent) - elif isinstance(sub_agent, (SequentialAgent, LoopAgent, ParallelAgent)): - sub_agent.set_sub_agents_tracer(tracer) - def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init - if self.tracers: - for tracer in self.tracers: - self.set_sub_agents_tracer(tracer) - logger.info(f"{self.__class__.__name__} `{self.name}` init done.") diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py index 4841680d..27d2000e 100644 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py +++ b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py @@ -70,11 +70,8 @@ def load_tracer() -> None: else: exporters.append(exporter_cls()) - tracer = OpentelemetryTracer( - name="veadk_tracer", app_name=agent_run_config.app_name, exporters=exporters - ) + tracer = OpentelemetryTracer(name="veadk_tracer", exporters=exporters) agent_run_config.agent.tracers.extend([tracer]) - tracer.do_hooks(agent=agent_run_config.agent) def build_mcp_run_agent_func() -> Callable: diff --git a/veadk/runner.py b/veadk/runner.py index bee47251..06f49038 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -27,7 +27,6 @@ from veadk.agents.sequential_agent import SequentialAgent from veadk.evaluation import EvalSetRecorder from veadk.memory.short_term_memory import ShortTermMemory -from veadk.tracing.base_tracer import UserMessagePlugin from veadk.types import MediaMessage from veadk.utils.logger import get_logger from veadk.utils.misc import read_png_to_bytes @@ -68,22 +67,9 @@ def __init__( # prevent VeRemoteAgent has no long-term memory attr if isinstance(self.agent, Agent): self.long_term_memory = self.agent.long_term_memory - for tracer in self.agent.tracers: - tracer.set_app_name(self.app_name) else: self.long_term_memory = None - # process plugins - try: - # try to detect tracer - _ = self.agent.tracers[0] - if not plugins: - plugins = [UserMessagePlugin(name="user_message_plugin")] - else: - plugins.append(UserMessagePlugin(name="user_message_plugin")) - except Exception: - logger.debug("Agent has no tracers, telemetry plugin not added.") - self.runner = ADKRunner( app_name=self.app_name, agent=self.agent, @@ -187,7 +173,27 @@ async def run( return final_output - def _print_trace_id(self): + def get_trace_id(self) -> str: + if not isinstance(self.agent, Agent): + logger.warning( + ("The agent is not an instance of VeADK Agent, no trace id provided.") + ) + return "" + + if not self.agent.tracers: + logger.warning( + "No tracer is configured in the agent, no trace id provided." + ) + return "" + + try: + trace_id = self.agent.tracers[0].trace_id # type: ignore + return trace_id + except Exception as e: + logger.warning(f"Get tracer id failed as {e}") + return "" + + def _print_trace_id(self) -> None: if not isinstance(self.agent, Agent): logger.warning( ("The agent is not an instance of VeADK Agent, no trace id provided.") @@ -201,7 +207,7 @@ def _print_trace_id(self): return try: - trace_id = self.agent.tracers[0].get_trace_id() # type: ignore + trace_id = self.agent.tracers[0].trace_id # type: ignore logger.info(f"Trace id: {trace_id}") except Exception as e: logger.warning(f"Get tracer id failed as {e}") diff --git a/veadk/tracing/base_tracer.py b/veadk/tracing/base_tracer.py index a3021048..ca47b001 100644 --- a/veadk/tracing/base_tracer.py +++ b/veadk/tracing/base_tracer.py @@ -12,75 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json from abc import ABC, abstractmethod -from typing import Any, Optional - -from google.adk.agents.callback_context import CallbackContext -from google.adk.agents.invocation_context import InvocationContext -from google.adk.models.llm_request import LlmRequest -from google.adk.models.llm_response import LlmResponse -from google.adk.plugins.base_plugin import BasePlugin -from google.adk.tools import BaseTool, ToolContext -from google.genai import types -from opentelemetry import trace from veadk.utils.logger import get_logger logger = get_logger(__name__) -class UserMessagePlugin(BasePlugin): - def __init__(self, name: str): - super().__init__(name) - - async def on_user_message_callback( - self, - *, - invocation_context: InvocationContext, - user_message: types.Content, - ) -> Optional[types.Content]: - """Callback executed when a user message is received before an invocation starts. - - This callback helps logging and modifying the user message before the - runner starts the invocation. - - Args: - invocation_context: The context for the entire invocation. - user_message: The message content input by user. - - Returns: - An optional `types.Content` to be returned to the ADK. Returning a - value to replace the user message. Returning `None` to proceed - normally. - """ - trace.get_tracer("gcp.vertex.agent") - span = trace.get_current_span() - - logger.debug(f"User message plugin works, catch {span}") - span_name = getattr(span, "name", None) - if span_name and span_name.startswith("invocation"): - agent_name = invocation_context.agent.name - invoke_branch = ( - invocation_context.branch if invocation_context.branch else agent_name - ) - current_session = invocation_context.session - - span.set_attribute("app.name", current_session.app_name) - span.set_attribute("user.id", current_session.user_id) - span.set_attribute("session.id", current_session.id) - - span.set_attribute("agent.name", agent_name) - span.set_attribute("invoke.branch", invoke_branch) - span.set_attribute("gen_ai.system", "veadk") - - logger.debug( - f"Add attributes to {span_name}: app_name={current_session.app_name}, user_id={current_session.user_id}, session_id={current_session.id}, agent_name={agent_name}, invoke_branch={invoke_branch}" - ) - - return None - - def replace_bytes_with_empty(data): """ Recursively traverse the data structure and replace all bytes types with empty strings. @@ -102,201 +40,11 @@ def replace_bytes_with_empty(data): class BaseTracer(ABC): def __init__(self, name: str): - self.app_name = "veadk_app_name" - pass + self.name = name + self._trace_id = "" + self._trace_file_path = "" @abstractmethod - def dump(self, user_id: str, session_id: str, path: str = "/tmp") -> str: ... - - def tracer_hook_before_model( - self, callback_context: CallbackContext, llm_request: LlmRequest - ) -> Optional[LlmResponse]: - """agent run stage""" - trace.get_tracer("gcp.vertex.agent") - span = trace.get_current_span() - # logger.debug(f"llm_request: {llm_request}") - - req = llm_request.model_dump() - - app_name = getattr(self, "app_name", "veadk_app") - agent_name = callback_context.agent_name - model_name = req.get("model", "unknown") - max_tokens = ( - None - if not req.get("live_connect_config") - else req["live_connect_config"].get("max_output_tokens", None) - ) - temperature = ( - None - if not req.get("live_connect_config") - else req["live_connect_config"].get("temperature", None) - ) - top_p = ( - None - if not req.get("live_connect_config") - else req["live_connect_config"].get("top_p", None) - ) - - attributes = {} - attributes["agent.name"] = agent_name - attributes["app.name"] = app_name - attributes["gen_ai.system"] = "veadk" - if model_name: - attributes["gen_ai.request.model"] = model_name - attributes["gen_ai.response.model"] = ( - model_name # The req model and the resp model should be consistent. - ) - attributes["gen_ai.request.type"] = "completion" - if max_tokens: - attributes["gen_ai.request.max_tokens"] = max_tokens - if temperature: - attributes["gen_ai.request.temperature"] = temperature - if top_p: - attributes["gen_ai.request.top_p"] = top_p - - # Print attributes for debugging - # print("Tracing attributes:", attributes) - - # Set all attributes at once if possible, else fallback to individual - if hasattr(span, "set_attributes"): - span.set_attributes(attributes) - else: - # Fallback for OpenTelemetry versions without set_attributes - for k, v in attributes.items(): - span.set_attribute(k, v) - - def tracer_hook_after_model( - self, callback_context: CallbackContext, llm_response: LlmResponse - ) -> Optional[LlmResponse]: - """call llm stage""" - trace.get_tracer("gcp.vertex.agent") - span = trace.get_current_span() - # logger.debug(f"llm_response: {llm_response}") - # logger.debug(f"callback_context: {callback_context}") - - # Refined: collect all attributes, use set_attributes, print for debugging - attributes = {} - - app_name = getattr(self, "app_name", "veadk_app") - agent_name = callback_context.agent_name - attributes["agent.name"] = agent_name - attributes["app.name"] = app_name - attributes["gen_ai.system"] = "veadk" - # prompt - user_content = callback_context.user_content - role = None - content = None - if getattr(user_content, "role", None): - role = getattr(user_content, "role", None) - - if user_content and getattr(user_content, "parts", None): - # content = user_content.model_dump_json(exclude_none=True) - content = user_content.model_dump(exclude_none=True).get("parts", None) - if content: - content = replace_bytes_with_empty(content) - content = json.dumps(content, ensure_ascii=False) if content else None - - if role and content: - attributes["gen_ai.prompt.0.role"] = role - attributes["gen_ai.prompt.0.content"] = content - - # completion - completion_content = getattr(llm_response, "content").model_dump( - exclude_none=True - ) - if completion_content: - content = json.dumps( - getattr(llm_response, "content").model_dump(exclude_none=True)["parts"] - ) - role = getattr(llm_response, "content").model_dump(exclude_none=True)[ - "role" - ] - if role and content: - attributes["gen_ai.completion.0.role"] = role - attributes["gen_ai.completion.0.content"] = content - - if not llm_response.usage_metadata: - return - - # tokens - metadata = llm_response.usage_metadata.model_dump() - if metadata: - prompt_tokens = metadata.get("prompt_token_count", None) - completion_tokens = metadata.get("candidates_token_count", None) - total_tokens = metadata.get("total_token_count", None) - cache_read_input_tokens = ( - metadata.get("cache_read_input_tokens") or 0 - ) # Might change, once openai introduces their equivalent. - cache_create_input_tokens = ( - metadata.get("cache_create_input_tokens") or 0 - ) # Might change, once openai introduces their equivalent. - if prompt_tokens: - attributes["gen_ai.usage.prompt_tokens"] = prompt_tokens - if completion_tokens: - attributes["gen_ai.usage.completion_tokens"] = completion_tokens - if total_tokens: - attributes["gen_ai.usage.total_tokens"] = total_tokens - if cache_read_input_tokens is not None: - attributes["gen_ai.usage.cache_read_input_tokens"] = ( - cache_read_input_tokens - ) - if cache_create_input_tokens is not None: - attributes["gen_ai.usage.cache_create_input_tokens"] = ( - cache_create_input_tokens - ) - - # Print attributes for debugging - # print("Tracing attributes:", attributes) - - # Set all attributes at once if possible, else fallback to individual - if hasattr(span, "set_attributes"): - span.set_attributes(attributes) - else: - # Fallback for OpenTelemetry versions without set_attributes - for k, v in attributes.items(): - span.set_attribute(k, v) - - def tracer_hook_after_tool( - self, - tool: BaseTool, - args: dict[str, Any], - tool_context: ToolContext, - tool_response: dict, - ): - trace.get_tracer("gcp.vertex.agent") - span = trace.get_current_span() - agent_name = tool_context.agent_name - tool_name = tool.name - app_name = getattr(self, "app_name", "veadk_app") - attributes = { - "agent.name": agent_name, - "app.name": app_name, - "tool.name": tool_name, - "gen_ai.system": "veadk", - } - - # Set all attributes at once if possible, else fallback to individual - if hasattr(span, "set_attributes"): - span.set_attributes(attributes) - else: - # Fallback for OpenTelemetry versions without set_attributes - for k, v in attributes.items(): - span.set_attribute(k, v) - - def set_app_name(self, app_name): - self.app_name = app_name - - def do_hooks(self, agent) -> None: - if not getattr(agent, "before_model_callback", None): - agent.before_model_callback = [] - if not getattr(agent, "after_model_callback", None): - agent.after_model_callback = [] - if not getattr(agent, "after_tool_callback", None): - agent.after_tool_callback = [] - - if self.tracer_hook_before_model not in agent.before_model_callback: - agent.before_model_callback.append(self.tracer_hook_before_model) - if self.tracer_hook_after_model not in agent.after_model_callback: - agent.after_model_callback.append(self.tracer_hook_after_model) - if self.tracer_hook_after_tool not in agent.after_tool_callback: - agent.after_tool_callback.append(self.tracer_hook_after_tool) + def dump(self, user_id: str, session_id: str, path: str = "/tmp") -> str: + """Dump the trace data to a local file.""" + ... diff --git a/veadk/tracing/telemetry/metrics/__init__.py b/veadk/tracing/telemetry/attributes/attributes.py similarity index 59% rename from veadk/tracing/telemetry/metrics/__init__.py rename to veadk/tracing/telemetry/attributes/attributes.py index 7f463206..19ced9d9 100644 --- a/veadk/tracing/telemetry/metrics/__init__.py +++ b/veadk/tracing/telemetry/attributes/attributes.py @@ -11,3 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from veadk.tracing.telemetry.attributes.extractors.common_attributes_extractors import ( + COMMON_ATTRIBUTES, +) +from veadk.tracing.telemetry.attributes.extractors.llm_attributes_extractors import ( + LLM_ATTRIBUTES, +) +from veadk.tracing.telemetry.attributes.extractors.tool_attributes_extractors import ( + TOOL_ATTRIBUTES, +) + +ATTRIBUTES = { + "common": COMMON_ATTRIBUTES, + "llm": LLM_ATTRIBUTES, + "tool": TOOL_ATTRIBUTES, +} diff --git a/veadk/tracing/telemetry/attributes/extractors/common_attributes_extractors.py b/veadk/tracing/telemetry/attributes/extractors/common_attributes_extractors.py new file mode 100644 index 00000000..cb5b73f1 --- /dev/null +++ b/veadk/tracing/telemetry/attributes/extractors/common_attributes_extractors.py @@ -0,0 +1,71 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from veadk.version import VERSION + + +def common_gen_ai_system(**kwargs) -> str: + """This field will be parsed as `model_provider` in Volcengine CozeLoop platform.""" + model_provider = kwargs.get("model_provider") + return model_provider or "" + + +def common_gen_ai_system_version(**kwargs) -> str: + return VERSION + + +def common_gen_ai_app_name(**kwargs) -> str: + app_name = kwargs.get("app_name") + return app_name or "" + + +def common_gen_ai_agent_name(**kwargs) -> str: + agent_name = kwargs.get("agent_name") + return agent_name or "" + + +def common_gen_ai_user_id(**kwargs) -> str: + user_id = kwargs.get("user_id") + return user_id or "" + + +def common_gen_ai_session_id(**kwargs) -> str: + session_id = kwargs.get("session_id") + return session_id or "" + + +def common_cozeloop_report_source(**kwargs) -> str: + return "veadk" + + +def llm_openinference_instrumentation_veadk(**kwargs) -> str: + return VERSION + + +COMMON_ATTRIBUTES = { + "gen_ai.system": common_gen_ai_system, + "gen_ai.system.version": common_gen_ai_system_version, + "gen_ai.agent.name": common_gen_ai_agent_name, + "openinference.instrumentation.veadk": llm_openinference_instrumentation_veadk, + "gen_ai.app.name": common_gen_ai_app_name, # APMPlus required + "gen_ai.user.id": common_gen_ai_user_id, # APMPlus required + "gen_ai.session.id": common_gen_ai_session_id, # APMPlus required + "agent_name": common_gen_ai_agent_name, # CozeLoop required + "agent.name": common_gen_ai_agent_name, # TLS required + "app_name": common_gen_ai_app_name, # CozeLoop required + "app.name": common_gen_ai_app_name, # TLS required + "user.id": common_gen_ai_user_id, # CozeLoop / TLS required + "session.id": common_gen_ai_session_id, # CozeLoop / TLS required + "cozeloop.report.source": common_cozeloop_report_source, # CozeLoop required +} diff --git a/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py new file mode 100644 index 00000000..39828a19 --- /dev/null +++ b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py @@ -0,0 +1,392 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from veadk.tracing.telemetry.attributes.extractors.types import ( + ExtractorResponse, + LLMAttributesParams, +) + + +def llm_gen_ai_request_model(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.llm_request.model or "") + + +def llm_gen_ai_request_type(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content="chat" or "") + + +def llm_gen_ai_response_model(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.llm_request.model or "") + + +def llm_gen_ai_request_max_tokens(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.llm_request.config.max_output_tokens) + + +def llm_gen_ai_request_temperature(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.llm_request.config.temperature) + + +def llm_gen_ai_request_top_p(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.llm_request.config.top_p) + + +def llm_gen_ai_response_stop_reason(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content="") + + +def llm_gen_ai_response_finish_reason(params: LLMAttributesParams) -> ExtractorResponse: + # TODO: update to google-adk v1.12.0 + return ExtractorResponse(content="") + + +def llm_gen_ai_usage_input_tokens(params: LLMAttributesParams) -> ExtractorResponse: + if params.llm_response.usage_metadata: + return ExtractorResponse( + content=params.llm_response.usage_metadata.prompt_token_count + ) + return ExtractorResponse(content=None) + + +def llm_gen_ai_usage_output_tokens(params: LLMAttributesParams) -> ExtractorResponse: + if params.llm_response.usage_metadata: + return ExtractorResponse( + content=params.llm_response.usage_metadata.candidates_token_count, + ) + return ExtractorResponse(content=None) + + +def llm_gen_ai_usage_total_tokens(params: LLMAttributesParams) -> ExtractorResponse: + if params.llm_response.usage_metadata: + return ExtractorResponse( + content=params.llm_response.usage_metadata.total_token_count, + ) + return ExtractorResponse(content=None) + + +# FIXME +def llm_gen_ai_usage_cache_creation_input_tokens( + params: LLMAttributesParams, +) -> ExtractorResponse: + if params.llm_response.usage_metadata: + return ExtractorResponse( + content=params.llm_response.usage_metadata.cached_content_token_count, + ) + return ExtractorResponse(content=None) + + +# FIXME +def llm_gen_ai_usage_cache_read_input_tokens( + params: LLMAttributesParams, +) -> ExtractorResponse: + if params.llm_response.usage_metadata: + return ExtractorResponse( + content=params.llm_response.usage_metadata.cached_content_token_count, + ) + return ExtractorResponse(content=None) + + +def llm_gen_ai_prompt(params: LLMAttributesParams) -> ExtractorResponse: + # a part is a message + messages: list[dict] = [] + + for content in params.llm_request.contents: + if content.parts: + for idx, part in enumerate(content.parts): + message = {} + # text part + if part.text: + message[f"gen_ai.prompt.{idx}.role"] = content.role + message[f"gen_ai.prompt.{idx}.content"] = part.text + # function response + if part.function_response: + message[f"gen_ai.prompt.{idx}.role"] = content.role + message[f"gen_ai.prompt.{idx}.content"] = ( + str(part.function_response.response) + if part.function_response + else "" + ) + # function call + if part.function_call: + message[f"gen_ai.prompt.{idx}.tool_calls.0.id"] = ( + part.function_call.id + if part.function_call.id + else "" + ) + message[f"gen_ai.prompt.{idx}.tool_calls.0.type"] = "function" + message[f"gen_ai.prompt.{idx}.tool_calls.0.function.name"] = ( + part.function_call.name + if part.function_call.name + else "" + ) + message[f"gen_ai.prompt.{idx}.tool_calls.0.function.arguments"] = ( + json.dumps(part.function_call.args) + if part.function_call.args + else json.dumps({}) + ) + + if message: + messages.append(message) + + return ExtractorResponse(content=messages) + + +def llm_gen_ai_completion(params: LLMAttributesParams) -> ExtractorResponse: + messages = [] + + content = params.llm_response.content + if content and content.parts: + for idx, part in enumerate(content.parts): + message = {} + if part.text: + message[f"gen_ai.completion.{idx}.role"] = content.role + message[f"gen_ai.completion.{idx}.content"] = part.text + elif part.function_call: + message[f"gen_ai.completion.{idx}.role"] = content.role + message[f"gen_ai.completion.{idx}.tool_calls.0.id"] = ( + part.function_call.id + if part.function_call.id + else "" + ) + message[f"gen_ai.completion.{idx}.tool_calls.0.type"] = "function" + message[f"gen_ai.completion.{idx}.tool_calls.0.function.name"] = ( + part.function_call.name + if part.function_call.name + else "" + ) + message[f"gen_ai.completion.{idx}.tool_calls.0.function.arguments"] = ( + json.dumps(part.function_call.args) + if part.function_call.args + else json.dumps({}) + ) + + if message: + messages.append(message) + return ExtractorResponse(content=messages) + + +def llm_gen_ai_is_streaming(params: LLMAttributesParams) -> ExtractorResponse: + # return params.llm_request.stream + return ExtractorResponse(content=None) + + +def llm_gen_ai_operation_name(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content="chat") + + +def llm_gen_ai_system_message(params: LLMAttributesParams) -> ExtractorResponse: + event_attributes = { + "content": str(params.llm_request.config.system_instruction), + "role": "system", + } + return ExtractorResponse(type="event", content=event_attributes) + + +def llm_gen_ai_user_message(params: LLMAttributesParams) -> ExtractorResponse: + # a content is a message + messages = [] + + for content in params.llm_request.contents: + if content.role == "user": + message_parts = [] + + if content.parts: + if len(content.parts) == 1: + if content.parts[0].text: + message_parts.append( + { + "role": content.role, + "content": content.parts[0].text, + } + ) + elif content.parts[0].function_response: + message_parts.append( + { + "role": content.role, + "content": str( + content.parts[0].function_response.response + ), + } + ) + else: + message_part = {"role": content.role} + for idx, part in enumerate(content.parts): + # text part + if part.text: + message_part[f"parts.{idx}.type"] = "text" + message_part[f"parts.{idx}.content"] = part.text + # function response + if part.function_response: + message_part[f"parts.{idx}.type"] = "function" + message_part[f"parts.{idx}.content"] = str( + part.function_response + ) + + message_parts.append(message_part) + + if message_parts: + messages.extend(message_parts) + + return ExtractorResponse(type="event", content=messages) + + +def llm_gen_ai_assistant_message(params: LLMAttributesParams) -> ExtractorResponse: + # a content is a message + messages = [] + + # each part in each content we make it a message + # e.g. 2 contents and 3 parts each means 6 messages + for content in params.llm_request.contents: + if content.role == "model": + message_parts = [] + + # each part we make it a message + if content.parts: + # only one part + if len(content.parts) == 1: + if content.parts[0].text: + message_parts.append( + { + "role": content.role, + "content": content.parts[0].text, + } + ) + elif content.parts[0].function_call: + pass + # multiple parts + else: + message_part = {"role": content.role} + + for idx, part in enumerate(content.parts): + # parse content + if part.text: + message_part[f"parts.{idx}.type"] = "text" + message_part[f"parts.{idx}.content"] = part.text + # parse tool_calls + if part.function_call: + message_part["tool_calls.0.id"] = ( + part.function_call.id + if part.function_call.id + else "" + ) + message_part["tool_calls.0.type"] = "function" + message_part["tool_calls.0.function.name"] = ( + part.function_call.name + if part.function_call.name + else "" + ) + message_part["tool_calls.0.function.arguments"] = ( + json.dumps(part.function_call.args) + if part.function_call.args + else json.dumps({}) + ) + message_parts.append(message_part) + + if message_parts: + messages.extend(message_parts) + + return ExtractorResponse(type="event", content=messages) + + +def llm_gen_ai_choice(params: LLMAttributesParams) -> ExtractorResponse: + message = {} + + # parse content to build a message + content = params.llm_response.content + if content and content.parts: + message = {"message.role": content.role} + + if len(content.parts) == 1: + part = content.parts[0] + if part.text: + message["message.content"] = part.text + elif part.function_call: + message["message.tool_calls.0.id"] = ( + part.function_call.id + if part.function_call.id + else "" + ) + message["message.tool_calls.0.type"] = "function" + message["message.tool_calls.0.function.name"] = ( + part.function_call.name + if part.function_call.name + else "" + ) + message["message.tool_calls.0.function.arguments"] = ( + json.dumps(part.function_call.args) + if part.function_call.args + else json.dumps({}) + ) + else: + for idx, part in enumerate(content.parts): + # parse content + if part.text: + message[f"message.parts.{idx}.type"] = "text" + message[f"message.parts.{idx}.text"] = part.text + + # parse tool_calls + if part.function_call: + message["message.tool_calls.0.id"] = ( + part.function_call.id + if part.function_call.id + else "" + ) + message["message.tool_calls.0.type"] = "function" + message["message.tool_calls.0.function.name"] = ( + part.function_call.name + if part.function_call.name + else "" + ) + message["message.tool_calls.0.function.arguments"] = ( + json.dumps(part.function_call.args) + if part.function_call.args + else json.dumps({}) + ) + + return ExtractorResponse(type="event", content=message) + + +LLM_ATTRIBUTES = { + # ===== request attributes ===== + "gen_ai.request.model": llm_gen_ai_request_model, + "gen_ai.request.type": llm_gen_ai_request_type, + "gen_ai.request.max_tokens": llm_gen_ai_request_max_tokens, + "gen_ai.request.temperature": llm_gen_ai_request_temperature, + "gen_ai.request.top_p": llm_gen_ai_request_top_p, + # ===== response attributes ===== + "gen_ai.response.model": llm_gen_ai_response_model, + "gen_ai.response.stop_reason": llm_gen_ai_response_stop_reason, + "gen_ai.response.finish_reason": llm_gen_ai_response_finish_reason, + # ===== streaming ===== + "gen_ai.is_streaming": llm_gen_ai_is_streaming, + # ===== span type ===== + "gen_ai.operation.name": llm_gen_ai_operation_name, + # ===== inputs and outputs ===== + # events + "gen_ai.system.message": llm_gen_ai_system_message, + "gen_ai.user.message": llm_gen_ai_user_message, + "gen_ai.assistant.message": llm_gen_ai_assistant_message, + "gen_ai.choice": llm_gen_ai_choice, + # attributes + "gen_ai.prompt": llm_gen_ai_prompt, + "gen_ai.completion": llm_gen_ai_completion, + # ===== usage ===== + "gen_ai.usage.input_tokens": llm_gen_ai_usage_input_tokens, + "gen_ai.usage.output_tokens": llm_gen_ai_usage_output_tokens, + "gen_ai.usage.total_tokens": llm_gen_ai_usage_total_tokens, + "gen_ai.usage.cache_creation_input_tokens": llm_gen_ai_usage_cache_creation_input_tokens, + "gen_ai.usage.cache_read_input_tokens": llm_gen_ai_usage_cache_read_input_tokens, +} diff --git a/veadk/tracing/telemetry/attributes/extractors/tool_attributes_extractors.py b/veadk/tracing/telemetry/attributes/extractors/tool_attributes_extractors.py new file mode 100644 index 00000000..1ebb5d8a --- /dev/null +++ b/veadk/tracing/telemetry/attributes/extractors/tool_attributes_extractors.py @@ -0,0 +1,70 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from veadk.tracing.telemetry.attributes.extractors.types import ( + ExtractorResponse, + ToolAttributesParams, +) + + +def tool_gen_ai_operation_name(params: ToolAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content="execute_tool") + + +def tool_gen_ai_tool_message(params: ToolAttributesParams) -> ExtractorResponse: + tool_input = { + "id": "123", + "role": "tool", + "content": json.dumps( + { + "name": params.tool.name, + "description": params.tool.description, + "parameters": params.args, + } + ), + } + return ExtractorResponse(type="event", content=tool_input) + + +def tool_cozeloop_input(params: ToolAttributesParams) -> ExtractorResponse: + tool_input = { + "name": params.tool.name, + "description": params.tool.description, + "parameters": params.args, + } + return ExtractorResponse(content=json.dumps(tool_input) or "") + + +def tool_gen_ai_tool_name(params: ToolAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content=params.tool.name or "") + + +def tool_cozeloop_output(params: ToolAttributesParams) -> ExtractorResponse: + function_response = params.function_response_event.get_function_responses()[0] + tool_output = { + "id": function_response.id, + "name": function_response.name, + "response": function_response.response, + } + return ExtractorResponse(content=json.dumps(tool_output) or "") + + +TOOL_ATTRIBUTES = { + "gen_ai.operation.name": tool_gen_ai_operation_name, + "gen_ai.tool.name": tool_gen_ai_tool_name, # TLS required + "cozeloop.input": tool_cozeloop_input, # CozeLoop required + "cozeloop.output": tool_cozeloop_output, # CozeLoop required +} diff --git a/veadk/tracing/telemetry/attributes/extractors/types.py b/veadk/tracing/telemetry/attributes/extractors/types.py new file mode 100644 index 00000000..7852f41d --- /dev/null +++ b/veadk/tracing/telemetry/attributes/extractors/types.py @@ -0,0 +1,75 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Literal + +from attr import dataclass +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.adk.tools import BaseTool +from opentelemetry.sdk.trace import _Span +from opentelemetry.trace.span import Span + + +@dataclass +class ExtractorResponse: + content: list | dict | None | str | int | float + + type: Literal["attribute", "event"] = "attribute" + """Type of extractor response. + + `attribute`: span.add_attribute(attr_name, attr_value) + `event`: span.add_event(...) + """ + + @staticmethod + def update_span( + span: _Span | Span, attr_name: str, response: "ExtractorResponse" + ) -> None: + if response.type == "attribute": + res = response.content + if isinstance(res, list): # list[dict] + for _res in res: + if isinstance(_res, dict): + for k, v in _res.items(): + span.set_attribute(k, v) + else: + # set anyway + span.set_attribute(attr_name, res) # type: ignore + elif response.type == "event": + if isinstance(response.content, dict): + span.add_event(attr_name, response.content) + elif isinstance(response.content, list): + for event in response.content: + span.add_event(attr_name, event) + else: + # Unsupported response type, discard it. + pass + + +@dataclass +class LLMAttributesParams: + invocation_context: InvocationContext + event_id: str + llm_request: LlmRequest + llm_response: LlmResponse + + +@dataclass +class ToolAttributesParams: + tool: BaseTool + args: dict[str, Any] + function_response_event: Event diff --git a/veadk/tracing/telemetry/exporters/apmplus_exporter.py b/veadk/tracing/telemetry/exporters/apmplus_exporter.py index 2844a6ef..32c53d94 100644 --- a/veadk/tracing/telemetry/exporters/apmplus_exporter.py +++ b/veadk/tracing/telemetry/exporters/apmplus_exporter.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any + +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse from opentelemetry import metrics +from opentelemetry import metrics as metrics_api from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.metrics._internal import Meter +from opentelemetry.sdk import metrics as metrics_sdk from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -24,12 +30,76 @@ from veadk.config import getenv from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter -from veadk.tracing.telemetry.metrics.opentelemetry_metrics import MeterContext from veadk.utils.logger import get_logger logger = get_logger(__name__) +class MeterUploader: + def __init__( + self, name: str, endpoint: str, headers: dict, resource_attributes: dict + ) -> None: + # global_metrics_provider -> global_tracer_provider + # exporter -> exporter + # metric_reader -> processor + global_metrics_provider = metrics_api.get_meter_provider() + + # 1. init resource + if hasattr(global_metrics_provider, "_sdk_config"): + global_resource = global_metrics_provider._sdk_config.resource # type: ignore + else: + global_resource = Resource.create() + + resource = global_resource.merge(Resource.create(resource_attributes)) + + # 2. init exporter and reader + exporter = OTLPMetricExporter(endpoint=endpoint, headers=headers) + metric_reader = PeriodicExportingMetricReader(exporter) + + metrics_api.set_meter_provider( + metrics_sdk.MeterProvider(metric_readers=[metric_reader], resource=resource) + ) + + # 3. init meter + self.meter: Meter = metrics.get_meter(name=name) + + # create meter attributes + self.llm_invoke_counter = self.meter.create_counter( + name="gen_ai.chat.count", + description="Number of LLM invocations", + unit="count", + ) + self.token_usage = self.meter.create_histogram( + name="gen_ai.client.token.usage", + description="Token consumption of LLM invocations", + unit="count", + ) + + def record(self, llm_request: LlmRequest, llm_response: LlmResponse) -> None: + attributes = { + "gen_ai_system": "volcengine", + "gen_ai_response_model": llm_request.model, + "gen_ai_operation_name": "chat_completions", + "stream": "false", + "server_address": "api.volcengine.com", + } # required by Volcengine APMPlus + + if llm_response.usage_metadata: + # llm invocation number += 1 + self.llm_invoke_counter.add(1, attributes) + + # upload token usage + input_token = llm_response.usage_metadata.prompt_token_count + output_token = llm_response.usage_metadata.candidates_token_count + + if input_token: + token_attributes = {**attributes, "gen_ai_token_type": "input"} + self.token_usage.record(input_token, attributes=token_attributes) + if output_token: + token_attributes = {**attributes, "gen_ai_token_type": "output"} + self.token_usage.record(output_token, attributes=token_attributes) + + class APMPlusExporterConfig(BaseModel): endpoint: str = Field( default_factory=lambda: getenv( @@ -49,50 +119,39 @@ class APMPlusExporterConfig(BaseModel): ) -class APMPlusExporter(BaseModel, BaseExporter): +class APMPlusExporter(BaseExporter): config: APMPlusExporterConfig = Field(default_factory=APMPlusExporterConfig) - @override - def get_processor(self): - resource_attributes = { - "service.name": self.config.service_name, - } - + def model_post_init(self, context: Any) -> None: headers = { "x-byteapm-appkey": self.config.app_key, } - exporter = OTLPSpanExporter( - endpoint=self.config.endpoint, insecure=True, headers=headers - ) - self._real_exporter = exporter - processor = BatchSpanProcessor(exporter) - return processor, resource_attributes - - def export(self): - self._real_exporter.force_flush() - logger.info( - f"APMPlusExporter exports data to {self.config.endpoint}, service name: {self.config.service_name}" - ) + self.headers |= headers - @override - def get_meter_context(self) -> MeterContext: resource_attributes = { "service.name": self.config.service_name, } - endpoint = self.config.endpoint - headers = { - "x-byteapm-appkey": self.config.app_key, - } + self.resource_attributes |= resource_attributes - resource = Resource.create(resource_attributes) - exporter = OTLPMetricExporter(endpoint=endpoint, headers=headers) - metric_reader = PeriodicExportingMetricReader(exporter) - provider = MeterProvider(metric_readers=[metric_reader], resource=resource) - metrics.set_meter_provider(provider) - meter = metrics.get_meter("my.meter.name") - meter_context = MeterContext( - meter=meter, - provider=provider, - reader=metric_reader, + self._exporter = OTLPSpanExporter( + endpoint=self.config.endpoint, insecure=True, headers=self.headers ) - return meter_context + self.processor = BatchSpanProcessor(self._exporter) + + self.meter_uploader = MeterUploader( + name="apmplus_meter", + endpoint=self.config.endpoint, + headers=self.headers, + resource_attributes=self.resource_attributes, + ) + + @override + def export(self) -> None: + if self._exporter: + self._exporter.force_flush() + + logger.info( + f"APMPlusExporter exports data to {self.config.endpoint}, service name: {self.config.service_name}" + ) + else: + logger.warning("APMPlusExporter internal exporter is not initialized.") diff --git a/veadk/tracing/telemetry/exporters/base_exporter.py b/veadk/tracing/telemetry/exporters/base_exporter.py index a5e819f1..7de85965 100644 --- a/veadk/tracing/telemetry/exporters/base_exporter.py +++ b/veadk/tracing/telemetry/exporters/base_exporter.py @@ -12,20 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod -from typing import Any +from opentelemetry.sdk.trace import SpanProcessor +from opentelemetry.sdk.trace.export import SpanExporter +from pydantic import BaseModel, ConfigDict, Field -class BaseExporter(ABC): - def __init__(self) -> None: - pass +class BaseExporter(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") - @abstractmethod - def get_processor(self) -> Any: - pass + resource_attributes: dict = Field(default_factory=dict) + headers: dict = Field(default_factory=dict) - def get_meter_context(self) -> Any: - pass + _exporter: SpanExporter | None = None + processor: SpanProcessor | None = None def export(self) -> None: + """Force export of telemetry data.""" pass diff --git a/veadk/tracing/telemetry/exporters/cozeloop_exporter.py b/veadk/tracing/telemetry/exporters/cozeloop_exporter.py index 624808ae..ef4e4ed8 100644 --- a/veadk/tracing/telemetry/exporters/cozeloop_exporter.py +++ b/veadk/tracing/telemetry/exporters/cozeloop_exporter.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor from pydantic import BaseModel, Field @@ -41,26 +43,31 @@ class CozeloopExporterConfig(BaseModel): ) -class CozeloopExporter(BaseModel, BaseExporter): +class CozeloopExporter(BaseExporter): config: CozeloopExporterConfig = Field(default_factory=CozeloopExporterConfig) - @override - def get_processor(self): + def model_post_init(self, context: Any) -> None: headers = { "cozeloop-workspace-id": self.config.space_id, "authorization": f"Bearer {self.config.token}", } - exporter = OTLPSpanExporter( + self.headers |= headers + + self._exporter = OTLPSpanExporter( endpoint=self.config.endpoint, - headers=headers, + headers=self.headers, timeout=10, ) - self._real_exporter = exporter - processor = BatchSpanProcessor(exporter) - return processor, None - def export(self): - self._real_exporter.force_flush() - logger.info( - f"CozeloopExporter exports data to {self.config.endpoint}, space id: {self.config.space_id}" - ) + self.processor = BatchSpanProcessor(self._exporter) + + @override + def export(self) -> None: + """Force export of telemetry data.""" + if self._exporter: + self._exporter.force_flush() + logger.info( + f"CozeloopExporter exports data to {self.config.endpoint}, space id: {self.config.space_id}" + ) + else: + logger.warning("CozeloopExporter internal exporter is not initialized.") diff --git a/veadk/tracing/telemetry/exporters/inmemory_exporter.py b/veadk/tracing/telemetry/exporters/inmemory_exporter.py index 8647f580..37b77acf 100644 --- a/veadk/tracing/telemetry/exporters/inmemory_exporter.py +++ b/veadk/tracing/telemetry/exporters/inmemory_exporter.py @@ -12,11 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import typing -from typing import Any +from typing import Sequence +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) from opentelemetry.sdk.trace import ReadableSpan, export -from pydantic import BaseModel from typing_extensions import override from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter @@ -27,20 +31,17 @@ # ======== Adapted from Google ADK ======== class _InMemoryExporter(export.SpanExporter): - def __init__(self, session_trace_dict): + def __init__(self) -> None: super().__init__() self._spans = [] - self.session_trace_dict = session_trace_dict self.trace_id = "" - self.prompt_tokens = [] - self.completion_tokens = [] + self.session_trace_dict = {} @override - def export(self, spans: typing.Sequence[ReadableSpan]) -> export.SpanExportResult: + def export(self, spans: Sequence[ReadableSpan]) -> export.SpanExportResult: for span in spans: if span.context: - trace_id = span.context.trace_id - self.trace_id = trace_id + self.trace_id = span.context.trace_id else: logger.warning( f"Span context is missing, failed to get `trace_id`. span: {span}" @@ -48,22 +49,12 @@ def export(self, spans: typing.Sequence[ReadableSpan]) -> export.SpanExportResul if span.name == "call_llm": attributes = dict(span.attributes or {}) - prompt_token = attributes.get("gen_ai.usage.prompt_tokens", None) - completion_token = attributes.get( - "gen_ai.usage.completion_tokens", None - ) - if prompt_token: - self.prompt_tokens.append(prompt_token) - if completion_token: - self.completion_tokens.append(completion_token) - if span.name == "call_llm": - attributes = dict(span.attributes or {}) - session_id = attributes.get("gcp.vertex.agent.session_id", None) + session_id = attributes.get("gen_ai.session.id", None) if session_id: if session_id not in self.session_trace_dict: - self.session_trace_dict[session_id] = [trace_id] + self.session_trace_dict[session_id] = [self.trace_id] else: - self.session_trace_dict[session_id] += [trace_id] + self.session_trace_dict[session_id] += [self.trace_id] self._spans.extend(spans) return export.SpanExportResult.SUCCESS @@ -81,14 +72,37 @@ def clear(self): self._spans.clear() -class InMemoryExporter(BaseModel, BaseExporter): - name: str = "inmemory_exporter" +class _InMemorySpanProcessor(export.SimpleSpanProcessor): + def __init__(self, exporter: _InMemoryExporter) -> None: + super().__init__(exporter) + self.spans = [] - def model_post_init(self, __context: Any) -> None: - self._exporter = _InMemoryExporter({}) + def on_start(self, span, parent_context) -> None: + if span.context: + self.spans.append(span) - @override - def get_processor(self): - self._real_exporter = self._exporter - processor = export.SimpleSpanProcessor(self._exporter) - return processor, None + def on_end(self, span: ReadableSpan) -> None: + if span.context: + if not span.context.trace_flags.sampled: + return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self.span_exporter.export((span,)) + # pylint: disable=broad-exception-caught + except Exception: + logger.exception("Exception while exporting Span.") + detach(token) + if span in self.spans: + self.spans.remove(span) + + +class InMemoryExporter(BaseExporter): + """InMemory Exporter mainly for store spans in memory for debugging / observability purposes.""" + + def __init__(self, name: str = "inmemory_exporter") -> None: + super().__init__() + + self.name = name + + self._exporter = _InMemoryExporter() + self.processor = _InMemorySpanProcessor(self._exporter) diff --git a/veadk/tracing/telemetry/exporters/tls_exporter.py b/veadk/tracing/telemetry/exporters/tls_exporter.py index 634d6df2..29829205 100644 --- a/veadk/tracing/telemetry/exporters/tls_exporter.py +++ b/veadk/tracing/telemetry/exporters/tls_exporter.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor from pydantic import BaseModel, Field @@ -44,28 +46,32 @@ class TLSExporterConfig(BaseModel): secret_key: str = Field(default_factory=lambda: getenv("VOLCENGINE_SECRET_KEY")) -class TLSExporter(BaseModel, BaseExporter): +class TLSExporter(BaseExporter): config: TLSExporterConfig = Field(default_factory=TLSExporterConfig) - @override - def get_processor(self): + def model_post_init(self, context: Any) -> None: headers = { "x-tls-otel-tracetopic": self.config.topic_id, "x-tls-otel-ak": self.config.access_key, "x-tls-otel-sk": self.config.secret_key, "x-tls-otel-region": self.config.region, } - exporter = OTLPSpanExporter( + self.headers |= headers + + self._exporter = OTLPSpanExporter( endpoint=self.config.endpoint, headers=headers, timeout=10, ) - self._real_exporter = exporter - processor = BatchSpanProcessor(exporter) - return processor, None - def export(self): - self._real_exporter.force_flush() - logger.info( - f"TLSExporter exports data to {self.config.endpoint}, topic id: {self.config.topic_id}" - ) + self.processor = BatchSpanProcessor(self._exporter) + + @override + def export(self) -> None: + if self._exporter: + self._exporter.force_flush() + logger.info( + f"TLSExporter exports data to {self.config.endpoint}, topic id: {self.config.topic_id}" + ) + else: + logger.warning("TLSExporter internal exporter is not initialized.") diff --git a/veadk/tracing/telemetry/metrics/opentelemetry_metrics.py b/veadk/tracing/telemetry/metrics/opentelemetry_metrics.py deleted file mode 100644 index be8eaa7b..00000000 --- a/veadk/tracing/telemetry/metrics/opentelemetry_metrics.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -from opentelemetry.metrics._internal import Meter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - -from veadk.config import getenv - - -class MeterContext: - def __init__( - self, - meter: Meter, - provider: MeterProvider, - reader: PeriodicExportingMetricReader, - ): - self.meter = meter - self.provider = provider - self.reader = reader - - -class MeterUploader: - def __init__(self, meter_context: MeterContext): - self.meter = meter_context.meter - self.provider = meter_context.provider - self.reader = meter_context.reader - - self.base_attributes = { - "gen_ai_system": "volcengine", - "server_address": "api.volcengine.com", - "gen_ai_response_model": getenv("MODEL_AGENT_NAME", "unknown"), - "stream": "false", - "gen_ai_operation_name": "chat_completions", - } - self.llm_invoke_counter = self.meter.create_counter( - name="gen_ai.chat.count", - description="Number of LLM invocations", - unit="count", - ) - self.token_usage = self.meter.create_histogram( - name="gen_ai.client.token.usage", - description="Token consumption of LLM invocations", - unit="count", - ) - - def record(self, prompt_tokens: list[int], completion_tokens: list[int]): - self.llm_invoke_counter.add(len(completion_tokens), self.base_attributes) - - for prompt_token in prompt_tokens: - token_attributes = {**self.base_attributes, "gen_ai_token_type": "input"} - self.token_usage.record(prompt_token, attributes=token_attributes) - for completion_token in completion_tokens: - token_attributes = {**self.base_attributes, "gen_ai_token_type": "output"} - self.token_usage.record(completion_token, attributes=token_attributes) - - def close(self): - time.sleep(0.05) - self.reader.force_flush() - self.provider.shutdown() diff --git a/veadk/tracing/telemetry/opentelemetry_tracer.py b/veadk/tracing/telemetry/opentelemetry_tracer.py index 4159df75..9f1019b9 100644 --- a/veadk/tracing/telemetry/opentelemetry_tracer.py +++ b/veadk/tracing/telemetry/opentelemetry_tracer.py @@ -18,96 +18,71 @@ import time from typing import Any -from openinference.instrumentation.google_adk import GoogleADKInstrumentor from opentelemetry import trace as trace_api from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk import trace as trace_sdk from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, field_validator from typing_extensions import override from veadk.tracing.base_tracer import BaseTracer from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter -from veadk.tracing.telemetry.metrics.opentelemetry_metrics import MeterUploader from veadk.utils.logger import get_logger +from veadk.utils.patches import patch_google_adk_telemetry logger = get_logger(__name__) -DEFAULT_VEADK_TRACER_NAME = "veadk_global_tracer" - -def update_resource_attributions(provider: TracerProvider, resource_attributes: dict): +def _update_resource_attributions( + provider: TracerProvider, resource_attributes: dict +) -> None: provider._resource = provider._resource.merge(Resource.create(resource_attributes)) class OpentelemetryTracer(BaseModel, BaseTracer): model_config = ConfigDict(arbitrary_types_allowed=True) - exporters: list[BaseExporter] = Field( - default=[], - description="The exporters to export spans.", - ) + name: str = Field( - default=DEFAULT_VEADK_TRACER_NAME, description="The identifier of tracer." + default="veadk_opentelemetry_tracer", description="The identifier of tracer." ) - app_name: str = Field( - default="veadk_app", - description="The identifier of app.", + exporters: list[BaseExporter] = Field( + default_factory=list, + description="The exporters to export spans.", ) - def model_post_init(self, context: Any, /) -> None: - self._processors = [] - self._inmemory_exporter: InMemoryExporter | None = None - - # InMemoryExporter is a default exporter for exporting local tracing file - for exporter in self.exporters: - if isinstance(exporter, InMemoryExporter): - self._inmemory_exporter = exporter - - if self._inmemory_exporter is None: - self._inmemory_exporter = InMemoryExporter() - self.exporters.append(self._inmemory_exporter) - # ======================================================================== + @field_validator("exporters") + @classmethod + def forbid_inmemory_exporter(cls, v: list[BaseExporter]) -> list[BaseExporter]: + for e in v: + if isinstance(e, InMemoryExporter): + raise ValueError("InMemoryExporter is not allowed in exporters list") + return v - # Process meter-related attributes - self._meter_contexts = [] - self._meter_uploaders = [] - for exporter in self.exporters: - meter_context = exporter.get_meter_context() - if meter_context is not None: - self._meter_contexts.append(meter_context) - - for meter_context in self._meter_contexts: - meter_uploader = MeterUploader(meter_context) - self._meter_uploaders.append(meter_uploader) - # ================================ - - # init tracer provider - # VeADK operates on global OpenTelemetry provider, hence return nothing - self._init_tracer_provider() + def model_post_init(self, context: Any) -> None: + patch_google_adk_telemetry() + self._init_global_tracer_provider() - # just for debug - self._trace_file_path = "" + # GoogleADKInstrumentor().instrument() - GoogleADKInstrumentor().instrument() + def _init_global_tracer_provider(self) -> None: + self._processors = [] - def _init_tracer_provider(self) -> None: - # set provider anyway - # finally, get global provider - tracer_provider = trace_sdk.TracerProvider() - trace_api.set_tracer_provider(tracer_provider) + # set provider anyway, then get global provider + trace_api.set_tracer_provider(trace_sdk.TracerProvider()) global_tracer_provider: TracerProvider = trace_api.get_tracer_provider() # type: ignore - have_apmplus_exporter = False - for processor in global_tracer_provider._active_span_processor._span_processors: - if isinstance(processor, (BatchSpanProcessor, SimpleSpanProcessor)): - if isinstance(processor.span_exporter, OTLPSpanExporter): - if "apmplus" in processor.span_exporter._endpoint: - have_apmplus_exporter = True + span_processors = global_tracer_provider._active_span_processor._span_processors + have_apmplus_exporter = any( + isinstance(p, (BatchSpanProcessor, SimpleSpanProcessor)) + and isinstance(p.span_exporter, OTLPSpanExporter) + and "apmplus" in p.span_exporter._endpoint + for p in span_processors + ) if have_apmplus_exporter: self.exporters = [ @@ -115,64 +90,83 @@ def _init_tracer_provider(self) -> None: ] for exporter in self.exporters: - processor, resource_attributes = exporter.get_processor() - if resource_attributes is not None: - update_resource_attributions( + processor = exporter.processor + resource_attributes = exporter.resource_attributes + + if resource_attributes: + _update_resource_attributions( global_tracer_provider, resource_attributes ) - global_tracer_provider.add_span_processor(processor) - logger.debug( - f"Add exporter `{exporter.__class__.__name__}` to OpentelemetryTracer." + + if processor: + global_tracer_provider.add_span_processor(processor) + self._processors.append(processor) + + logger.debug( + f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer." + ) + else: + logger.error( + f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer failed." + ) + + self._inmemory_exporter = InMemoryExporter() + if self._inmemory_exporter.processor: + # make sure the in memory exporter processor is added at index 0 + # because we use this to record all spans + global_tracer_provider._active_span_processor._span_processors = ( + self._inmemory_exporter.processor, + ) + global_tracer_provider._active_span_processor._span_processors + + self._processors.append(self._inmemory_exporter.processor) + else: + logger.warning( + "InMemoryExporter processor is not initialized, cannot add to OpentelemetryTracer." ) - self._processors.append(processor) - logger.debug(f"Init OpentelemetryTracer with {len(self.exporters)} exporters.") - def get_trace_id(self) -> str: - if not self._inmemory_exporter: - return "" - try: - trace_id = hex(int(self._inmemory_exporter._real_exporter.trace_id))[2:] - except Exception: - return "" + logger.info(f"Init OpentelemetryTracer with {len(self._processors)} exporters.") + + @property + def trace_file_path(self) -> str: + return self._trace_file_path - return trace_id + @property + def trace_id(self) -> str: + try: + trace_id = hex(int(self._inmemory_exporter._exporter.trace_id))[2:] # type: ignore + return trace_id + except Exception as e: + logger.error(f"Failed to get trace_id from InMemoryExporter: {e}") + return self._trace_id + + def force_export(self) -> None: + """Force to export spans in all processors.""" + for processor in self._processors: + time.sleep(0.05) + processor.force_flush() @override def dump( self, - user_id: str, - session_id: str, + user_id: str = "unknown_user_id", + session_id: str = "unknown_session_id", path: str = "/tmp", ) -> str: + def _build_trace_file_path(path: str, user_id: str, session_id: str) -> str: + return f"{path}/{self.name}_{user_id}_{session_id}_{self.trace_id}.json" + if not self._inmemory_exporter: logger.warning( "InMemoryExporter is not initialized. Please check your tracer exporters." ) return "" + self.force_export() - prompt_tokens = self._inmemory_exporter._real_exporter.prompt_tokens - completion_tokens = self._inmemory_exporter._real_exporter.completion_tokens - - # upload - for meter_uploader in self._meter_uploaders: - meter_uploader.record( - prompt_tokens=prompt_tokens, completion_tokens=completion_tokens - ) - # clear tokens after dump - self._inmemory_exporter._real_exporter.completion_tokens = [] - self._inmemory_exporter._real_exporter.prompt_tokens = [] - - for processor in self._processors: - time.sleep(0.05) # give some time for the exporter to upload spans - processor.force_flush() - - spans = self._inmemory_exporter._real_exporter.get_finished_spans( + spans = self._inmemory_exporter._exporter.get_finished_spans( # type: ignore session_id=session_id ) - if not spans: - data = [] - else: - data = [ + data = ( + [ { "name": s.name, "span_id": s.context.span_id, @@ -184,24 +178,18 @@ def dump( } for s in spans ] + if spans + else [] + ) - trace_id = hex(int(self._inmemory_exporter._real_exporter.trace_id))[2:] - self._trace_id = trace_id - file_path = f"{path}/{self.name}_{user_id}_{session_id}_{trace_id}.json" - with open(file_path, "w") as f: + self._trace_file_path = _build_trace_file_path(path, user_id, session_id) + with open(self._trace_file_path, "w") as f: json.dump( data, f, indent=4, ensure_ascii=False ) # ensure_ascii=False to support Chinese characters - self._trace_file_path = file_path - - for exporter in self.exporters: - if not isinstance(exporter, InMemoryExporter): - exporter.export() logger.info( - f"OpenTelemetryTracer tracing done, trace id: {self._trace_id} (hex)" + f"OpenTelemetryTracer dumps {len(spans)} spans to {self._trace_file_path}. Trace id: {self.trace_id} (hex)" ) - self._spans = spans - logger.info(f"OpenTelemetryTracer dumps {len(spans)} spans to {file_path}") - return file_path + return self._trace_file_path diff --git a/veadk/tracing/telemetry/telemetry.py b/veadk/tracing/telemetry/telemetry.py new file mode 100644 index 00000000..59dbf324 --- /dev/null +++ b/veadk/tracing/telemetry/telemetry.py @@ -0,0 +1,149 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.adk.tools import BaseTool +from opentelemetry import trace +from opentelemetry.sdk.trace import _Span + +from veadk.tracing.telemetry.attributes.attributes import ATTRIBUTES +from veadk.tracing.telemetry.attributes.extractors.types import ( + ExtractorResponse, + LLMAttributesParams, + ToolAttributesParams, +) +from veadk.utils.logger import get_logger + +logger = get_logger(__name__) + + +def upload_metrics( + invocation_context: InvocationContext, + llm_request: LlmRequest, + llm_response: LlmResponse, +) -> None: + from veadk.agent import Agent + + if isinstance(invocation_context.agent, Agent): + tracers = invocation_context.agent.tracers + for tracer in tracers: + for exporter in getattr(tracer, "exporters", []): + if getattr(exporter, "meter_uploader", None): + exporter.meter_uploader.record(llm_request, llm_response) + + +def trace_send_data(): ... + + +def set_common_attributes( + invocation_context: InvocationContext, current_span: _Span, **kwargs +) -> None: + from veadk.agent import Agent + + if current_span.context: + current_span_id = current_span.context.trace_id + else: + logger.warning( + "Current span context is missing, failed to get `trace_id` to set common attributes." + ) + return + + if isinstance(invocation_context.agent, Agent): + try: + from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer + + tracer: OpentelemetryTracer = invocation_context.agent.tracers[0] # type: ignore + spans = tracer._inmemory_exporter.processor.spans # # type: ignore + + spans_in_current_trace = [ + span + for span in spans + if span.context and span.context.trace_id == current_span_id + ] + + common_attributes = ATTRIBUTES.get("common", {}) + for span in spans_in_current_trace: + if span.name.startswith("invocation"): + span.set_attribute("gen_ai.operation.name", "chain") + elif span.name.startswith("agent_run"): + span.set_attribute("gen_ai.operation.name", "agent") + for attr_name, attr_extractor in common_attributes.items(): + value = attr_extractor(**kwargs) + span.set_attribute(attr_name, value) + except Exception as e: + logger.error(f"Failed to set common attributes for spans: {e}") + else: + logger.warning( + "Failed to set common attributes for spans as your agent is not VeADK Agent. Skip this." + ) + + +def trace_tool_call( + tool: BaseTool, + args: dict[str, Any], + function_response_event: Event, +) -> None: + span = trace.get_current_span() + + tool_attributes_mapping = ATTRIBUTES.get("tool", {}) + params = ToolAttributesParams(tool, args, function_response_event) + + for attr_name, attr_extractor in tool_attributes_mapping.items(): + response: ExtractorResponse = attr_extractor(params) + ExtractorResponse.update_span(span, attr_name, response) + + +def trace_call_llm( + invocation_context: InvocationContext, + event_id: str, + llm_request: LlmRequest, + llm_response: LlmResponse, +) -> None: + span = trace.get_current_span() + + from veadk.agent import Agent + + set_common_attributes( + invocation_context=invocation_context, + current_span=span, # type: ignore + agent_name=invocation_context.agent.name, + user_id=invocation_context.user_id, + app_name=invocation_context.app_name, + session_id=invocation_context.session.id, + model_provider=invocation_context.agent.model_provider + if isinstance(invocation_context.agent, Agent) + else "", + model_name=invocation_context.agent.model_name + if isinstance(invocation_context.agent, Agent) + else "", + ) + + llm_attributes_mapping = ATTRIBUTES.get("llm", {}) + params = LLMAttributesParams( + invocation_context=invocation_context, + event_id=event_id, + llm_request=llm_request, + llm_response=llm_response, + ) + + for attr_name, attr_extractor in llm_attributes_mapping.items(): + response: ExtractorResponse = attr_extractor(params) + ExtractorResponse.update_span(span, attr_name, response) + + upload_metrics(invocation_context, llm_request, llm_response) diff --git a/veadk/utils/misc.py b/veadk/utils/misc.py index 15852522..b10b55b2 100644 --- a/veadk/utils/misc.py +++ b/veadk/utils/misc.py @@ -27,7 +27,7 @@ def read_file(file_path): return data -def formatted_timestamp(): +def formatted_timestamp() -> str: # YYYYMMDDHHMMSS return time.strftime("%Y%m%d%H%M%S", time.localtime()) diff --git a/veadk/utils/patches.py b/veadk/utils/patches.py index 53f472de..8df5c3a6 100644 --- a/veadk/utils/patches.py +++ b/veadk/utils/patches.py @@ -13,7 +13,14 @@ # limitations under the License. import asyncio +import sys +from typing import Callable +from veadk.tracing.telemetry.telemetry import ( + trace_call_llm, + trace_send_data, + trace_tool_call, +) from veadk.utils.logger import get_logger logger = get_logger(__name__) @@ -53,3 +60,21 @@ def patched_cancel_scope_exit(self, exc_type, exc_val, exc_tb): raise CancelScope.__exit__ = patched_cancel_scope_exit + + +def patch_google_adk_telemetry() -> None: + trace_functions = { + "trace_tool_call": trace_tool_call, + "trace_call_llm": trace_call_llm, + "trace_send_data": trace_send_data, + } + + for mod_name, mod in sys.modules.items(): + if mod_name.startswith("google.adk"): + for var_name in dir(mod): + var = getattr(mod, var_name, None) + if var_name in trace_functions.keys() and isinstance(var, Callable): + setattr(mod, var_name, trace_functions[var_name]) + logger.debug( + f"Patch {mod_name} {var_name} with {trace_functions[var_name]}" + )