diff --git a/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py index a43cd090..e24e18eb 100644 --- a/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py +++ b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py @@ -144,6 +144,8 @@ def llm_gen_ai_prompt(params: LLMAttributesParams) -> ExtractorResponse: message[f"gen_ai.prompt.{idx}.type"] = "image_url" message[f"gen_ai.prompt.{idx}.image_url.name"] = ( part.inline_data.display_name.split("/")[-1] + if part.inline_data.display_name + else "" ) message[f"gen_ai.prompt.{idx}.image_url.url"] = ( part.inline_data.display_name @@ -190,135 +192,241 @@ def llm_gen_ai_completion(params: LLMAttributesParams) -> ExtractorResponse: return ExtractorResponse(content=messages) -def llm_gen_ai_is_streaming(params: LLMAttributesParams) -> ExtractorResponse: - # return params.llm_request.stream - return ExtractorResponse(content=None) - - -def llm_gen_ai_operation_name(params: LLMAttributesParams) -> ExtractorResponse: - return ExtractorResponse(content="chat") - - -def llm_gen_ai_system_message(params: LLMAttributesParams) -> ExtractorResponse: - event_attributes = { - "content": str(params.llm_request.config.system_instruction), - "role": "system", - } - return ExtractorResponse(type="event", content=event_attributes) - +def llm_gen_ai_messages(params: LLMAttributesParams) -> ExtractorResponse: + events = [] -def llm_gen_ai_user_message(params: LLMAttributesParams) -> ExtractorResponse: - # a content is a message - messages = [] + # system message + events.append( + { + "gen_ai.system.message": { + "role": "system", + "content": str(params.llm_request.config.system_instruction), + } + } + ) - for content in params.llm_request.contents: - if content.role == "user": - message_parts = [] - - if content.parts: - if len(content.parts) == 1: - if content.parts[0].text: - message_parts.append( - { - "role": content.role, - "content": content.parts[0].text, - } - ) - elif content.parts[0].function_response: - message_parts.append( - { - "role": content.role, - "content": str( - content.parts[0].function_response.response - ), - } - ) - else: - message_part = {"role": content.role} + # user, tool, and assistant message + if params.llm_request and params.llm_request.contents: + for content in params.llm_request.contents: + if content and content.parts: + # content.role == "user" + # part.function_response -> gen_ai.tool.message + # not part.function_response -> gen_ai.user.message + # content.role == "model" -> gen_ai.assistant.message + if content.role == "user": + user_event = {} + user_event["gen_ai.user.message"] = {"role": content.role} for idx, part in enumerate(content.parts): - # text part - if part.text: - message_part[f"parts.{idx}.type"] = "text" - message_part[f"parts.{idx}.content"] = part.text - # function response if part.function_response: - message_part[f"parts.{idx}.type"] = "function" - message_part[f"parts.{idx}.content"] = str( - part.function_response + events.append( + { + "gen_ai.tool.message": { + "role": "tool", + "id": part.function_response.id, + "content": safe_json_serialize( + part.function_response.response + ), + } + } ) - if part.inline_data: - message_part[f"parts.{idx}.type"] = "image_url" - message_part[f"parts.{idx}.image_url.name"] = ( - part.inline_data.display_name.split("/")[-1] + else: + if part.text: + if len(content.parts) == 1: + user_event["gen_ai.user.message"].update( + {"content": part.text} + ) + else: + user_event["gen_ai.user.message"].update( + { + f"parts.{idx}.type": "text", + f"parts.{idx}.text": part.text, + }, + ) + if part.inline_data: + if len(content.parts) == 1: + # TODO(qingliang) + pass + else: + user_event["gen_ai.user.message"].update( + { + f"parts.{idx}.type": "image_url", + f"parts.{idx}.image_url.name": ( + part.inline_data.display_name.split( + "/" + )[-1] + if part.inline_data.display_name + else "" + ), + f"parts.{idx}.image_url.url": ( + part.inline_data.display_name + if part.inline_data.display_name + else "" + ), + } + ) + # in case of only function response + if len(user_event["gen_ai.user.message"].items()) > 1: + events.append(user_event) + elif content.role == "model": + event = {} + event["gen_ai.assistant.message"] = {"role": content.role} + for idx, part in enumerate(content.parts): + if part.text: + event["gen_ai.assistant.message"].update( + { + f"parts.{idx}.type": "text", + f"parts.{idx}.text": part.text, + } ) - message_part[f"parts.{idx}.image_url.url"] = ( - part.inline_data.display_name + if part.function_call: + event["gen_ai.assistant.message"].update( + { + "tool_calls.0.id": str(part.function_call.id), + "tool_calls.0.type": "function", + "tool_calls.0.function.name": part.function_call.name + if part.function_call.name + else "", + "tool_calls.0.function.arguments": safe_json_serialize( + part.function_call.args + ) + if part.function_call.args + else json.dumps({}), + } ) + events.append(event) - message_parts.append(message_part) - - if message_parts: - messages.extend(message_parts) - - return ExtractorResponse(type="event", content=messages) + return ExtractorResponse(type="event_list", content=events) -def llm_gen_ai_assistant_message(params: LLMAttributesParams) -> ExtractorResponse: - # a content is a message - messages = [] +def llm_gen_ai_is_streaming(params: LLMAttributesParams) -> ExtractorResponse: + # return params.llm_request.stream + return ExtractorResponse(content=None) - # each part in each content we make it a message - # e.g. 2 contents and 3 parts each means 6 messages - for content in params.llm_request.contents: - if content.role == "model": - message_parts = [] - - # each part we make it a message - if content.parts: - # only one part - if len(content.parts) == 1: - if content.parts[0].text: - message_parts.append( - { - "role": content.role, - "content": content.parts[0].text, - } - ) - elif content.parts[0].function_call: - pass - # multiple parts - else: - message_part = {"role": content.role} - for idx, part in enumerate(content.parts): - # parse content - if part.text: - message_part[f"parts.{idx}.type"] = "text" - message_part[f"parts.{idx}.content"] = part.text - # parse tool_calls - if part.function_call: - message_part["tool_calls.0.id"] = ( - part.function_call.id - if part.function_call.id - else "" - ) - message_part["tool_calls.0.type"] = "function" - message_part["tool_calls.0.function.name"] = ( - part.function_call.name - if part.function_call.name - else "" - ) - message_part["tool_calls.0.function.arguments"] = ( - safe_json_serialize(part.function_call.args) - if part.function_call.args - else json.dumps({}) - ) - message_parts.append(message_part) +def llm_gen_ai_operation_name(params: LLMAttributesParams) -> ExtractorResponse: + return ExtractorResponse(content="chat") - if message_parts: - messages.extend(message_parts) - return ExtractorResponse(type="event", content=messages) +# def llm_gen_ai_system_message(params: LLMAttributesParams) -> ExtractorResponse: +# event_attributes = { +# "content": str(params.llm_request.config.system_instruction), +# "role": "system", +# } +# return ExtractorResponse(type="event", content=event_attributes) + + +# def llm_gen_ai_user_message(params: LLMAttributesParams) -> ExtractorResponse: +# # a content is a message +# messages = [] + +# for content in params.llm_request.contents: +# if content.role == "user": +# message_parts = [] + +# if content.parts: +# if len(content.parts) == 1: +# if content.parts[0].text: +# message_parts.append( +# { +# "role": content.role, +# "content": content.parts[0].text, +# } +# ) +# elif content.parts[0].function_response: +# message_parts.append( +# { +# "role": content.role, +# "content": str( +# content.parts[0].function_response.response +# ), +# } +# ) +# else: +# message_part = {"role": content.role} +# for idx, part in enumerate(content.parts): +# # text part +# if part.text: +# message_part[f"parts.{idx}.type"] = "text" +# message_part[f"parts.{idx}.content"] = part.text +# # function response +# if part.function_response: +# message_part[f"parts.{idx}.type"] = "function" +# message_part[f"parts.{idx}.content"] = str( +# part.function_response +# ) +# if part.inline_data: +# message_part[f"parts.{idx}.type"] = "image_url" +# message_part[f"parts.{idx}.image_url.name"] = ( +# part.inline_data.display_name.split("/")[-1] +# ) +# message_part[f"parts.{idx}.image_url.url"] = ( +# part.inline_data.display_name +# ) + +# message_parts.append(message_part) + +# if message_parts: +# messages.extend(message_parts) + +# return ExtractorResponse(type="event", content=messages) + + +# def llm_gen_ai_assistant_message(params: LLMAttributesParams) -> ExtractorResponse: +# # a content is a message +# messages = [] + +# # each part in each content we make it a message +# # e.g. 2 contents and 3 parts each means 6 messages +# for content in params.llm_request.contents: +# if content.role == "model": +# message_parts = [] + +# # each part we make it a message +# if content.parts: +# # only one part +# if len(content.parts) == 1: +# if content.parts[0].text: +# message_parts.append( +# { +# "role": content.role, +# "content": content.parts[0].text, +# } +# ) +# elif content.parts[0].function_call: +# pass +# # multiple parts +# else: +# message_part = {"role": content.role} + +# for idx, part in enumerate(content.parts): +# # parse content +# if part.text: +# message_part[f"parts.{idx}.type"] = "text" +# message_part[f"parts.{idx}.content"] = part.text +# # parse tool_calls +# if part.function_call: +# message_part["tool_calls.0.id"] = ( +# part.function_call.id +# if part.function_call.id +# else "" +# ) +# message_part["tool_calls.0.type"] = "function" +# message_part["tool_calls.0.function.name"] = ( +# part.function_call.name +# if part.function_call.name +# else "" +# ) +# message_part["tool_calls.0.function.arguments"] = ( +# safe_json_serialize(part.function_call.args) +# if part.function_call.args +# else json.dumps({}) +# ) +# message_parts.append(message_part) + +# if message_parts: +# messages.extend(message_parts) + +# return ExtractorResponse(type="event", content=messages) def llm_gen_ai_choice(params: LLMAttributesParams) -> ExtractorResponse: @@ -416,36 +524,52 @@ def llm_gen_ai_request_functions(params: LLMAttributesParams) -> ExtractorRespon LLM_ATTRIBUTES = { - # ===== request attributes ===== + # -> 1. attributes + # -> 1.1. request "gen_ai.request.model": llm_gen_ai_request_model, "gen_ai.request.type": llm_gen_ai_request_type, "gen_ai.request.max_tokens": llm_gen_ai_request_max_tokens, "gen_ai.request.temperature": llm_gen_ai_request_temperature, "gen_ai.request.top_p": llm_gen_ai_request_top_p, + # CozeLoop required "gen_ai.request.functions": llm_gen_ai_request_functions, - # ===== response attributes ===== + # -> 1.2. response "gen_ai.response.model": llm_gen_ai_response_model, "gen_ai.response.stop_reason": llm_gen_ai_response_stop_reason, "gen_ai.response.finish_reason": llm_gen_ai_response_finish_reason, - # ===== streaming ===== + # -> 1.3. streaming "gen_ai.is_streaming": llm_gen_ai_is_streaming, - # ===== span type ===== + # -> 1.4. span kind "gen_ai.operation.name": llm_gen_ai_operation_name, - # ===== inputs and outputs ===== - # events - "gen_ai.system.message": llm_gen_ai_system_message, - "gen_ai.user.message": llm_gen_ai_user_message, - "gen_ai.assistant.message": llm_gen_ai_assistant_message, - "gen_ai.choice": llm_gen_ai_choice, - # attributes + # -> 1.5. inputs "gen_ai.prompt": llm_gen_ai_prompt, + # -> 1.6. outputs "gen_ai.completion": llm_gen_ai_completion, - # "input.value": llm_input_value, - # "output.value": llm_output_value, - # ===== usage ===== + # -> 1.7. usage "gen_ai.usage.input_tokens": llm_gen_ai_usage_input_tokens, "gen_ai.usage.output_tokens": llm_gen_ai_usage_output_tokens, "gen_ai.usage.total_tokens": llm_gen_ai_usage_total_tokens, "gen_ai.usage.cache_creation_input_tokens": llm_gen_ai_usage_cache_creation_input_tokens, "gen_ai.usage.cache_read_input_tokens": llm_gen_ai_usage_cache_read_input_tokens, + # -> 2. events + # -> 2.1. inputs + # In order to adapt OpenTelemetry and CozeLoop rendering, + # and avoid error sequence of tool-call and too-response, + # we use `llm_gen_ai_messages` to upload system message, user message, + # and assistant message together. + # Correct sequence: system message, user message, tool message, + # and assistant message. + "gen_ai.messages": llm_gen_ai_messages, + # [depracated] + # "gen_ai.system.message": llm_gen_ai_system_message, + # [depracated] + # "gen_ai.user.message": llm_gen_ai_user_message, + # [depracated] + # "gen_ai.assistant.message": llm_gen_ai_assistant_message, + # -> 2.2. outputs + "gen_ai.choice": llm_gen_ai_choice, + # [debugging] + # "input.value": llm_input_value, + # [debugging] + # "output.value": llm_output_value, } diff --git a/veadk/tracing/telemetry/attributes/extractors/types.py b/veadk/tracing/telemetry/attributes/extractors/types.py index 7852f41d..87b83255 100644 --- a/veadk/tracing/telemetry/attributes/extractors/types.py +++ b/veadk/tracing/telemetry/attributes/extractors/types.py @@ -26,13 +26,14 @@ @dataclass class ExtractorResponse: - content: list | dict | None | str | int | float + content: Any - type: Literal["attribute", "event"] = "attribute" + type: Literal["attribute", "event", "event_list"] = "attribute" """Type of extractor response. `attribute`: span.add_attribute(attr_name, attr_value) `event`: span.add_event(...) + `event_list`: span.add_event(...) for each event in the list """ @staticmethod @@ -41,7 +42,7 @@ def update_span( ) -> None: if response.type == "attribute": res = response.content - if isinstance(res, list): # list[dict] + if isinstance(res, list): for _res in res: if isinstance(_res, dict): for k, v in _res.items(): @@ -54,7 +55,17 @@ def update_span( span.add_event(attr_name, response.content) elif isinstance(response.content, list): for event in response.content: - span.add_event(attr_name, event) + span.add_event(attr_name, event) # type: ignore + elif response.type == "event_list": + if isinstance(response.content, list): + for event in response.content: + if isinstance(event, dict): + # we ensure this dict only have one key-value pair + key, value = next(iter(event.items())) + span.add_event(key, value) + else: + # Unsupported response type, discard it. + pass else: # Unsupported response type, discard it. pass diff --git a/veadk/tracing/telemetry/opentelemetry_tracer.py b/veadk/tracing/telemetry/opentelemetry_tracer.py index 34e6e436..841b65ec 100644 --- a/veadk/tracing/telemetry/opentelemetry_tracer.py +++ b/veadk/tracing/telemetry/opentelemetry_tracer.py @@ -30,9 +30,9 @@ from veadk.tracing.base_tracer import BaseTracer from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter -from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter from veadk.tracing.telemetry.exporters.inmemory_exporter import ( _INMEMORY_EXPORTER_INSTANCE, + InMemoryExporter, ) from veadk.utils.logger import get_logger from veadk.utils.patches import patch_google_adk_telemetry @@ -58,6 +58,8 @@ class OpentelemetryTracer(BaseModel, BaseTracer): description="The exporters to export spans.", ) + # Forbid InMemoryExporter in exporters list + # cause we need to set custom in-memory span processor by VeADK @field_validator("exporters") @classmethod def forbid_inmemory_exporter(cls, v: list[BaseExporter]) -> list[BaseExporter]: @@ -67,14 +69,18 @@ def forbid_inmemory_exporter(cls, v: list[BaseExporter]) -> list[BaseExporter]: return v def model_post_init(self, context: Any) -> None: + # Replace Google ADK tracing funcs + # `trace_call_llm` and `trace_tool_call` patch_google_adk_telemetry() - self._init_global_tracer_provider() - - # GoogleADKInstrumentor().instrument() - def _init_global_tracer_provider(self) -> None: + # We save internal processors for tracing data dump self._processors = [] + # Initialize global tracer provider to avoid VeFaaS global tracer + # provider conflicts + self._init_global_tracer_provider() + + def _init_global_tracer_provider(self) -> None: # set provider anyway, then get global provider trace_api.set_tracer_provider(trace_sdk.TracerProvider()) global_tracer_provider: TracerProvider = trace_api.get_tracer_provider() # type: ignore diff --git a/veadk/tracing/telemetry/telemetry.py b/veadk/tracing/telemetry/telemetry.py index 2f732973..4210061a 100644 --- a/veadk/tracing/telemetry/telemetry.py +++ b/veadk/tracing/telemetry/telemetry.py @@ -92,10 +92,16 @@ def _set_agent_input_attribute( "gen_ai.user.message", { f"parts.{idx}.type": "image_url", - f"parts.{idx}.image_url.name": part.inline_data.display_name.split( - "/" - )[-1], - f"parts.{idx}.image_url.url": part.inline_data.display_name, + f"parts.{idx}.image_url.name": ( + part.inline_data.display_name.split("/")[-1] + if part.inline_data.display_name + else "" + ), + f"parts.{idx}.image_url.url": ( + part.inline_data.display_name + if part.inline_data.display_name + else "" + ), }, )