From 7275d9ec95bfb119be5617197316d94bd93d7b84 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 22 Aug 2025 12:34:06 +0800 Subject: [PATCH 01/11] feat: add toshandler --- veadk/database/tos/toshandler.py | 176 +++++++++++++++++++++++++++++++ veadk/runner.py | 19 +++- 2 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 veadk/database/tos/toshandler.py diff --git a/veadk/database/tos/toshandler.py b/veadk/database/tos/toshandler.py new file mode 100644 index 00000000..7ee1cac2 --- /dev/null +++ b/veadk/database/tos/toshandler.py @@ -0,0 +1,176 @@ +import os +from veadk.config import getenv +from veadk.utils.logger import get_logger +import tos +from datetime import datetime +import asyncio +from typing import Literal +from urllib.parse import urlparse + +logger = get_logger(__name__) + + +class TOSHandler: + def __init__(self): + """Initialize TOS configuration information""" + self.region = getenv("VOLCENGINE_REGION") + self.ak = getenv("VOLCENGINE_ACCESS_KEY") + self.sk = getenv("VOLCENGINE_SECRET_KEY") + self.bucket_name = getenv("DATABASE_TOS_BUCKET") + + def _init_tos_client(self): + """initialize TOS client""" + try: + return tos.TosClientV2( + self.ak, + self.sk, + endpoint=f"tos-{self.region}.volces.com", + region=self.region, + ) + except Exception as e: + logger.error(f"Client initialization failed:{e}") + return None + + def get_suffix(self, data_path: str) -> str: + """Extract the complete file suffix with leading dot (including compound suffixes such as .tar.gz)""" + COMPOUND_SUFFIXES = { + "tar.gz", + "tar.bz2", + "tar.xz", + "tar.Z", + "tar.lz", + "tar.lzma", + "tar.lzo", + "gz", + "bz2", + "xz", + "Z", + "lz", + "lzma", + "lzo", + } + parsed = urlparse(data_path) + path = parsed.path if parsed.scheme in ("http", "https") else data_path + + filename = os.path.basename(path).split("?")[0].split("#")[0] + + parts = filename.split(".") + if len(parts) < 2: + return "" + for i in range(2, len(parts) + 1): + candidate = ".".join(parts[-i:]) + if candidate in COMPOUND_SUFFIXES: + return f".{candidate.lower()}" + return f".{parts[-1].lower()}" + + def gen_url(self, user_id, app_name, session_id, data_path): + """generate TOS URL""" + suffix = self.get_suffix(data_path) + timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] + url = ( + f"{self.bucket_name}/{app_name}/{user_id}-{session_id}-{timestamp}{suffix}" + ) + return url + + def parse_url(self, url): + """Parse the URL to obtain bucket_name and object_key""" + """bucket_name/object_key""" + parts = url.split("/", 1) + if len(parts) < 2: + raise ValueError("URL format error, it should be: bucket_name/object_key") + return parts + + def create_bucket(self, client, bucket_name): + """If the bucket does not exist, create it""" + try: + client.head_bucket(self.bucket_name) + logger.debug(f"Bucket {bucket_name} already exists") + return True + except tos.exceptions.TosServerError as e: + if e.status_code == 404: + client.create_bucket( + bucket=bucket_name, + storage_class=tos.StorageClassType.Storage_Class_Standard, + acl=tos.ACLType.ACL_Private, + ) + logger.debug(f"Bucket {bucket_name} created successfully") + return True + except Exception as e: + logger.error(f"Bucket creation failed: {str(e)}") + return False + + def upload_to_tos(self, url: str, data, data_type: Literal["file", "bytes"]): + if data_type not in ("file", "bytes"): + error_msg = f"Upload failed: data_type error. Only 'file' and 'bytes' are supported, got {data_type}" + logger.error(error_msg) + raise ValueError(error_msg) + if data_type == "file": + return asyncio.to_thread(self._do_upload_file, url, data) + elif data_type == "bytes": + return asyncio.to_thread(self._do_upload_bytes, url, data) + + def _do_upload_bytes(self, url, bytes): + bucket_name, object_key = self.parse_url(url) + client = self._init_tos_client() + try: + if not client: + return False + if not self.create_bucket(client, bucket_name): + return False + + client.put_object(bucket=bucket_name, key=object_key, content=bytes) + return True + except Exception as e: + logger.error(f"Upload failed: {e}") + return False + finally: + if client: + client.close() + + def _do_upload_file(self, url, file_path): + bucket_name, object_key = self.parse_url(url) + client = self._init_tos_client() + try: + if not client: + return False + if not self.create_bucket(client, bucket_name): + return False + + client.put_object_from_file( + bucket=bucket_name, key=object_key, file_path=file_path + ) + return True + except Exception as e: + logger.error(f"Upload failed: {e}") + return False + finally: + if client: + client.close() + + def download_from_tos(self, url, save_path): + """download image from TOS""" + try: + bucket_name, object_key = self.parse_url(url) + client = self._init_tos_client() + if not client: + return False + + object_stream = client.get_object(bucket_name, object_key) + + save_dir = os.path.dirname(save_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir, exist_ok=True) + + with open(save_path, "wb") as f: + for chunk in object_stream: + f.write(chunk) + + logger.debug(f"Image download success, saved to: {save_path}") + client.close() + return True + + except Exception as e: + logger.error(f"Image download failed: {str(e)}") + if "client" in locals(): + client.close() + return False diff --git a/veadk/runner.py b/veadk/runner.py index d9afdaa3..a234a337 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from typing import Union from google.adk.agents import RunConfig @@ -31,6 +32,7 @@ from veadk.types import MediaMessage from veadk.utils.logger import get_logger from veadk.utils.misc import read_png_to_bytes +from veadk.database.tos.toshandler import TOSHandler logger = get_logger(__name__) @@ -89,13 +91,22 @@ def __init__( plugins=plugins, ) - def _convert_messages(self, messages) -> list: + def _convert_messages(self, messages, session_id) -> list: if isinstance(messages, str): messages = [types.Content(role="user", parts=[types.Part(text=messages)])] elif isinstance(messages, MediaMessage): assert messages.media.endswith(".png"), ( "The MediaMessage only supports PNG format file for now." ) + data = read_png_to_bytes(messages.media) + url = messages.media + if self.agent.tracers: + tos_handler = TOSHandler() + url = tos_handler.gen_url( + self.user_id, self.app_name, session_id, messages.media + ) + asyncio.create_task(tos_handler.upload_to_tos(url, data, "bytes")) + messages = [ types.Content( role="user", @@ -103,8 +114,8 @@ def _convert_messages(self, messages) -> list: types.Part(text=messages.text), types.Part( inline_data=Blob( - display_name=messages.media, - data=read_png_to_bytes(messages.media), + display_name=url, + data=data, mime_type="image/png", ) ), @@ -164,7 +175,7 @@ async def run( stream: bool = False, save_tracing_data: bool = False, ): - converted_messages: list = self._convert_messages(messages) + converted_messages: list = self._convert_messages(messages, session_id) await self.short_term_memory.create_session( app_name=self.app_name, user_id=self.user_id, session_id=session_id From 4c9dd7eab2e4d0a7b6cf6cf369d6ef4d481696e0 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 22 Aug 2025 16:05:31 +0800 Subject: [PATCH 02/11] feat: Normalize the code --- .../tos/{toshandler.py => tos_handler.py} | 28 +++++++++++-------- veadk/runner.py | 2 +- 2 files changed, 17 insertions(+), 13 deletions(-) rename veadk/database/tos/{toshandler.py => tos_handler.py} (87%) diff --git a/veadk/database/tos/toshandler.py b/veadk/database/tos/tos_handler.py similarity index 87% rename from veadk/database/tos/toshandler.py rename to veadk/database/tos/tos_handler.py index 7ee1cac2..ee1f042e 100644 --- a/veadk/database/tos/toshandler.py +++ b/veadk/database/tos/tos_handler.py @@ -4,7 +4,7 @@ import tos from datetime import datetime import asyncio -from typing import Literal +from typing import Literal, Union from urllib.parse import urlparse logger = get_logger(__name__) @@ -13,7 +13,7 @@ class TOSHandler: def __init__(self): """Initialize TOS configuration information""" - self.region = getenv("VOLCENGINE_REGION") + self.region = getenv("DATABASE_TOS_REGION") self.ak = getenv("VOLCENGINE_ACCESS_KEY") self.sk = getenv("VOLCENGINE_SECRET_KEY") self.bucket_name = getenv("DATABASE_TOS_BUCKET") @@ -63,16 +63,18 @@ def get_suffix(self, data_path: str) -> str: return f".{candidate.lower()}" return f".{parts[-1].lower()}" - def gen_url(self, user_id, app_name, session_id, data_path): + def gen_url( + self, user_id: str, app_name: str, session_id: str, data_path: str + ) -> str: """generate TOS URL""" - suffix = self.get_suffix(data_path) - timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] - url = ( + suffix: str = self.get_suffix(data_path) + timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] + url: str = ( f"{self.bucket_name}/{app_name}/{user_id}-{session_id}-{timestamp}{suffix}" ) return url - def parse_url(self, url): + def parse_url(self, url: str) -> tuple[str, str]: """Parse the URL to obtain bucket_name and object_key""" """bucket_name/object_key""" parts = url.split("/", 1) @@ -80,7 +82,7 @@ def parse_url(self, url): raise ValueError("URL format error, it should be: bucket_name/object_key") return parts - def create_bucket(self, client, bucket_name): + def create_bucket(self, client: tos.TosClientV2, bucket_name: str) -> bool: """If the bucket does not exist, create it""" try: client.head_bucket(self.bucket_name) @@ -99,7 +101,9 @@ def create_bucket(self, client, bucket_name): logger.error(f"Bucket creation failed: {str(e)}") return False - def upload_to_tos(self, url: str, data, data_type: Literal["file", "bytes"]): + def upload_to_tos( + self, url: str, data: Union[str, bytes], data_type: Literal["file", "bytes"] + ): if data_type not in ("file", "bytes"): error_msg = f"Upload failed: data_type error. Only 'file' and 'bytes' are supported, got {data_type}" logger.error(error_msg) @@ -109,7 +113,7 @@ def upload_to_tos(self, url: str, data, data_type: Literal["file", "bytes"]): elif data_type == "bytes": return asyncio.to_thread(self._do_upload_bytes, url, data) - def _do_upload_bytes(self, url, bytes): + def _do_upload_bytes(self, url: str, bytes: bytes) -> bool: bucket_name, object_key = self.parse_url(url) client = self._init_tos_client() try: @@ -127,7 +131,7 @@ def _do_upload_bytes(self, url, bytes): if client: client.close() - def _do_upload_file(self, url, file_path): + def _do_upload_file(self, url: str, file_path: str) -> bool: bucket_name, object_key = self.parse_url(url) client = self._init_tos_client() try: @@ -147,7 +151,7 @@ def _do_upload_file(self, url, file_path): if client: client.close() - def download_from_tos(self, url, save_path): + def download_from_tos(self, url: str, save_path: str) -> bool: """download image from TOS""" try: bucket_name, object_key = self.parse_url(url) diff --git a/veadk/runner.py b/veadk/runner.py index a234a337..5bed1218 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -32,7 +32,7 @@ from veadk.types import MediaMessage from veadk.utils.logger import get_logger from veadk.utils.misc import read_png_to_bytes -from veadk.database.tos.toshandler import TOSHandler +from veadk.database.tos.tos_handler import TOSHandler logger = get_logger(__name__) From c23b33444816215c4ca21b25600cf5549015dd5a Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 22 Aug 2025 17:55:01 +0800 Subject: [PATCH 03/11] feat: add License Header --- tests/test_runner.py | 4 +-- veadk/database/tos/tos_handler.py | 47 ++++++++++++++++++------------- veadk/runner.py | 1 + 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/tests/test_runner.py b/tests/test_runner.py index 510a3323..b6a6ff8a 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -28,7 +28,7 @@ def _test_convert_messages(runner): role="user", ) ] - actual_message = runner._convert_messages(message) + actual_message = runner._convert_messages(message, session_id="test_session_id") assert actual_message == expected_message message = ["test message 1", "test message 2"] @@ -42,7 +42,7 @@ def _test_convert_messages(runner): role="user", ), ] - actual_message = runner._convert_messages(message) + actual_message = runner._convert_messages(message, session_id="test_session_id") assert actual_message == expected_message diff --git a/veadk/database/tos/tos_handler.py b/veadk/database/tos/tos_handler.py index ee1f042e..5751adc9 100644 --- a/veadk/database/tos/tos_handler.py +++ b/veadk/database/tos/tos_handler.py @@ -1,3 +1,17 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os from veadk.config import getenv from veadk.utils.logger import get_logger @@ -17,6 +31,7 @@ def __init__(self): self.ak = getenv("VOLCENGINE_ACCESS_KEY") self.sk = getenv("VOLCENGINE_SECRET_KEY") self.bucket_name = getenv("DATABASE_TOS_BUCKET") + self.client = self._init_tos_client() def _init_tos_client(self): """initialize TOS client""" @@ -82,15 +97,15 @@ def parse_url(self, url: str) -> tuple[str, str]: raise ValueError("URL format error, it should be: bucket_name/object_key") return parts - def create_bucket(self, client: tos.TosClientV2, bucket_name: str) -> bool: + def create_bucket(self, bucket_name: str) -> bool: """If the bucket does not exist, create it""" try: - client.head_bucket(self.bucket_name) + self.client.head_bucket(self.bucket_name) logger.debug(f"Bucket {bucket_name} already exists") return True except tos.exceptions.TosServerError as e: if e.status_code == 404: - client.create_bucket( + self.client.create_bucket( bucket=bucket_name, storage_class=tos.StorageClassType.Storage_Class_Standard, acl=tos.ACLType.ACL_Private, @@ -115,21 +130,17 @@ def upload_to_tos( def _do_upload_bytes(self, url: str, bytes: bytes) -> bool: bucket_name, object_key = self.parse_url(url) - client = self._init_tos_client() try: - if not client: + if not self.client: return False - if not self.create_bucket(client, bucket_name): + if not self.create_bucket(bucket_name): return False - client.put_object(bucket=bucket_name, key=object_key, content=bytes) + self.client.put_object(bucket=bucket_name, key=object_key, content=bytes) return True except Exception as e: logger.error(f"Upload failed: {e}") return False - finally: - if client: - client.close() def _do_upload_file(self, url: str, file_path: str) -> bool: bucket_name, object_key = self.parse_url(url) @@ -147,19 +158,13 @@ def _do_upload_file(self, url: str, file_path: str) -> bool: except Exception as e: logger.error(f"Upload failed: {e}") return False - finally: - if client: - client.close() def download_from_tos(self, url: str, save_path: str) -> bool: """download image from TOS""" try: bucket_name, object_key = self.parse_url(url) - client = self._init_tos_client() - if not client: - return False - object_stream = client.get_object(bucket_name, object_key) + object_stream = self.client.get_object(bucket_name, object_key) save_dir = os.path.dirname(save_path) if save_dir and not os.path.exists(save_dir): @@ -170,11 +175,13 @@ def download_from_tos(self, url: str, save_path: str) -> bool: f.write(chunk) logger.debug(f"Image download success, saved to: {save_path}") - client.close() return True except Exception as e: logger.error(f"Image download failed: {str(e)}") - if "client" in locals(): - client.close() + return False + + def close_client(self): + if self.client: + self.client.close() diff --git a/veadk/runner.py b/veadk/runner.py index 5bed1218..03fbe882 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -106,6 +106,7 @@ def _convert_messages(self, messages, session_id) -> list: self.user_id, self.app_name, session_id, messages.media ) asyncio.create_task(tos_handler.upload_to_tos(url, data, "bytes")) + tos_handler.close_client() messages = [ types.Content( From 58d5130d704ac89830efc4fe26e59758db7d9cb7 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 22 Aug 2025 18:29:10 +0800 Subject: [PATCH 04/11] feat: fix session_id --- veadk/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/veadk/runner.py b/veadk/runner.py index 03fbe882..7e466862 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -126,7 +126,7 @@ def _convert_messages(self, messages, session_id) -> list: elif isinstance(messages, list): converted_messages = [] for message in messages: - converted_messages.extend(self._convert_messages(message)) + converted_messages.extend(self._convert_messages(message, session_id)) messages = converted_messages else: raise ValueError(f"Unknown message type: {type(messages)}") From c699e9cc042f18d54737ed5b622953831e302df8 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Mon, 25 Aug 2025 13:31:09 +0800 Subject: [PATCH 05/11] feat: add tos_config & modify the code --- veadk/database/tos/tos_client.py | 155 +++++++++++++++++++++++++ veadk/database/tos/tos_handler.py | 187 ------------------------------ veadk/runner.py | 31 +++-- 3 files changed, 179 insertions(+), 194 deletions(-) create mode 100644 veadk/database/tos/tos_client.py delete mode 100644 veadk/database/tos/tos_handler.py diff --git a/veadk/database/tos/tos_client.py b/veadk/database/tos/tos_client.py new file mode 100644 index 00000000..9e6c10a6 --- /dev/null +++ b/veadk/database/tos/tos_client.py @@ -0,0 +1,155 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from veadk.config import getenv +from veadk.utils.logger import get_logger +import tos +import asyncio +from typing import Literal, Union +from pydantic import BaseModel, Field +from typing import Optional, Any + +logger = get_logger(__name__) + + +class TOSConfig(BaseModel): + region: str = Field( + default_factory=lambda: getenv("DATABASE_TOS_REGION"), + description="TOS region", + ) + ak: str = Field( + default_factory=lambda: getenv("VOLCENGINE_ACCESS_KEY"), + description="Volcengine access key", + ) + sk: str = Field( + default_factory=lambda: getenv("VOLCENGINE_SECRET_KEY"), + description="Volcengine secret key", + ) + bucket_name: str = Field( + default_factory=lambda: getenv("DATABASE_TOS_BUCKET"), + description="TOS bucket name", + ) + + +class TOSClient(BaseModel): + config: TOSConfig = Field(default_factory=TOSConfig) + client: Optional[Any] = Field(default=None, description="TOS client instance") + + def __init__(self, **data: Any): + super().__init__(**data) + self.client = self._init() + + def _init(self): + """initialize TOS client""" + try: + return tos.TosClientV2( + self.config.ak, + self.config.sk, + endpoint=f"tos-{self.config.region}.volces.com", + region=self.config.region, + ) + except Exception as e: + logger.error(f"Client initialization failed:{e}") + return None + + def create_bucket(self) -> bool: + """If the bucket does not exist, create it""" + try: + self.client.head_bucket(self.config.bucket_name) + logger.info(f"Bucket {self.config.bucket_name} already exists") + return True + except tos.exceptions.TosServerError as e: + if e.status_code == 404: + self.client.create_bucket( + bucket=self.config.bucket_name, + storage_class=tos.StorageClassType.Storage_Class_Standard, + acl=tos.ACLType.ACL_Private, + ) + logger.info(f"Bucket {self.config.bucket_name} created successfully") + return True + except Exception as e: + logger.error(f"Bucket creation failed: {str(e)}") + return False + + def upload( + self, + object_key: str, + data: Union[str, bytes], + data_type: Literal["file", "bytes"], + ): + if data_type not in ("file", "bytes"): + error_msg = f"Upload failed: data_type error. Only 'file' and 'bytes' are supported, got {data_type}" + logger.error(error_msg) + raise ValueError(error_msg) + if data_type == "file": + return asyncio.to_thread(self._do_upload_file, object_key, data) + elif data_type == "bytes": + return asyncio.to_thread(self._do_upload_bytes, object_key, data) + + def _do_upload_bytes(self, object_key: str, bytes: bytes) -> bool: + try: + if not self.client: + return False + if not self.create_bucket(): + return False + + self.client.put_object( + bucket=self.config.bucket_name, key=object_key, content=bytes + ) + return True + except Exception as e: + logger.error(f"Upload failed: {e}") + return False + + def _do_upload_file(self, object_key: str, file_path: str) -> bool: + client = self._init_tos_client() + try: + if not client: + return False + if not self.create_bucket(client, self.config.bucket_name): + return False + + client.put_object_from_file( + bucket=self.config.bucket_name, key=object_key, file_path=file_path + ) + return True + except Exception as e: + logger.error(f"Upload failed: {e}") + return False + + def download(self, object_key: str, save_path: str) -> bool: + """download image from TOS""" + try: + object_stream = self.client.get_object(self.config.bucket_name, object_key) + + save_dir = os.path.dirname(save_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir, exist_ok=True) + + with open(save_path, "wb") as f: + for chunk in object_stream: + f.write(chunk) + + logger.debug(f"Image download success, saved to: {save_path}") + return True + + except Exception as e: + logger.error(f"Image download failed: {str(e)}") + + return False + + def close_client(self): + if self.client: + self.client.close() diff --git a/veadk/database/tos/tos_handler.py b/veadk/database/tos/tos_handler.py deleted file mode 100644 index 5751adc9..00000000 --- a/veadk/database/tos/tos_handler.py +++ /dev/null @@ -1,187 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -from veadk.config import getenv -from veadk.utils.logger import get_logger -import tos -from datetime import datetime -import asyncio -from typing import Literal, Union -from urllib.parse import urlparse - -logger = get_logger(__name__) - - -class TOSHandler: - def __init__(self): - """Initialize TOS configuration information""" - self.region = getenv("DATABASE_TOS_REGION") - self.ak = getenv("VOLCENGINE_ACCESS_KEY") - self.sk = getenv("VOLCENGINE_SECRET_KEY") - self.bucket_name = getenv("DATABASE_TOS_BUCKET") - self.client = self._init_tos_client() - - def _init_tos_client(self): - """initialize TOS client""" - try: - return tos.TosClientV2( - self.ak, - self.sk, - endpoint=f"tos-{self.region}.volces.com", - region=self.region, - ) - except Exception as e: - logger.error(f"Client initialization failed:{e}") - return None - - def get_suffix(self, data_path: str) -> str: - """Extract the complete file suffix with leading dot (including compound suffixes such as .tar.gz)""" - COMPOUND_SUFFIXES = { - "tar.gz", - "tar.bz2", - "tar.xz", - "tar.Z", - "tar.lz", - "tar.lzma", - "tar.lzo", - "gz", - "bz2", - "xz", - "Z", - "lz", - "lzma", - "lzo", - } - parsed = urlparse(data_path) - path = parsed.path if parsed.scheme in ("http", "https") else data_path - - filename = os.path.basename(path).split("?")[0].split("#")[0] - - parts = filename.split(".") - if len(parts) < 2: - return "" - for i in range(2, len(parts) + 1): - candidate = ".".join(parts[-i:]) - if candidate in COMPOUND_SUFFIXES: - return f".{candidate.lower()}" - return f".{parts[-1].lower()}" - - def gen_url( - self, user_id: str, app_name: str, session_id: str, data_path: str - ) -> str: - """generate TOS URL""" - suffix: str = self.get_suffix(data_path) - timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] - url: str = ( - f"{self.bucket_name}/{app_name}/{user_id}-{session_id}-{timestamp}{suffix}" - ) - return url - - def parse_url(self, url: str) -> tuple[str, str]: - """Parse the URL to obtain bucket_name and object_key""" - """bucket_name/object_key""" - parts = url.split("/", 1) - if len(parts) < 2: - raise ValueError("URL format error, it should be: bucket_name/object_key") - return parts - - def create_bucket(self, bucket_name: str) -> bool: - """If the bucket does not exist, create it""" - try: - self.client.head_bucket(self.bucket_name) - logger.debug(f"Bucket {bucket_name} already exists") - return True - except tos.exceptions.TosServerError as e: - if e.status_code == 404: - self.client.create_bucket( - bucket=bucket_name, - storage_class=tos.StorageClassType.Storage_Class_Standard, - acl=tos.ACLType.ACL_Private, - ) - logger.debug(f"Bucket {bucket_name} created successfully") - return True - except Exception as e: - logger.error(f"Bucket creation failed: {str(e)}") - return False - - def upload_to_tos( - self, url: str, data: Union[str, bytes], data_type: Literal["file", "bytes"] - ): - if data_type not in ("file", "bytes"): - error_msg = f"Upload failed: data_type error. Only 'file' and 'bytes' are supported, got {data_type}" - logger.error(error_msg) - raise ValueError(error_msg) - if data_type == "file": - return asyncio.to_thread(self._do_upload_file, url, data) - elif data_type == "bytes": - return asyncio.to_thread(self._do_upload_bytes, url, data) - - def _do_upload_bytes(self, url: str, bytes: bytes) -> bool: - bucket_name, object_key = self.parse_url(url) - try: - if not self.client: - return False - if not self.create_bucket(bucket_name): - return False - - self.client.put_object(bucket=bucket_name, key=object_key, content=bytes) - return True - except Exception as e: - logger.error(f"Upload failed: {e}") - return False - - def _do_upload_file(self, url: str, file_path: str) -> bool: - bucket_name, object_key = self.parse_url(url) - client = self._init_tos_client() - try: - if not client: - return False - if not self.create_bucket(client, bucket_name): - return False - - client.put_object_from_file( - bucket=bucket_name, key=object_key, file_path=file_path - ) - return True - except Exception as e: - logger.error(f"Upload failed: {e}") - return False - - def download_from_tos(self, url: str, save_path: str) -> bool: - """download image from TOS""" - try: - bucket_name, object_key = self.parse_url(url) - - object_stream = self.client.get_object(bucket_name, object_key) - - save_dir = os.path.dirname(save_path) - if save_dir and not os.path.exists(save_dir): - os.makedirs(save_dir, exist_ok=True) - - with open(save_path, "wb") as f: - for chunk in object_stream: - f.write(chunk) - - logger.debug(f"Image download success, saved to: {save_path}") - return True - - except Exception as e: - logger.error(f"Image download failed: {str(e)}") - - return False - - def close_client(self): - if self.client: - self.client.close() diff --git a/veadk/runner.py b/veadk/runner.py index 7e466862..9130a3ed 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -11,8 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import asyncio from typing import Union +from urllib.parse import urlparse +from datetime import datetime from google.adk.agents import RunConfig from google.adk.agents.run_config import StreamingMode @@ -32,7 +35,7 @@ from veadk.types import MediaMessage from veadk.utils.logger import get_logger from veadk.utils.misc import read_png_to_bytes -from veadk.database.tos.tos_handler import TOSHandler +from veadk.database.tos.tos_client import TOSClient logger = get_logger(__name__) @@ -91,6 +94,21 @@ def __init__( plugins=plugins, ) + def _build_object_key( + self, user_id: str, app_name: str, session_id: str, data_path: str + ) -> str: + """generate TOS object key""" + parsed_url = urlparse(data_path) + + if parsed_url.scheme and parsed_url.scheme in ("http", "https", "ftp", "ftps"): + file_name = os.path.basename(parsed_url.path) + else: + file_name = os.path.basename(data_path) + + timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] + object_key: str = f"{app_name}-{user_id}-{session_id}/{timestamp}-{file_name}" + return object_key + def _convert_messages(self, messages, session_id) -> list: if isinstance(messages, str): messages = [types.Content(role="user", parts=[types.Part(text=messages)])] @@ -99,14 +117,13 @@ def _convert_messages(self, messages, session_id) -> list: "The MediaMessage only supports PNG format file for now." ) data = read_png_to_bytes(messages.media) - url = messages.media + object_key = messages.media if self.agent.tracers: - tos_handler = TOSHandler() - url = tos_handler.gen_url( + tos_client = TOSClient() + object_key = self._build_object_key( self.user_id, self.app_name, session_id, messages.media ) - asyncio.create_task(tos_handler.upload_to_tos(url, data, "bytes")) - tos_handler.close_client() + asyncio.create_task(tos_client.upload(object_key, data, "bytes")) messages = [ types.Content( @@ -115,7 +132,7 @@ def _convert_messages(self, messages, session_id) -> list: types.Part(text=messages.text), types.Part( inline_data=Blob( - display_name=url, + display_name=object_key, data=data, mime_type="image/png", ) From 2c446592d63d847cf3aca97a4e0f0a04b989eac8 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Mon, 25 Aug 2025 14:41:47 +0800 Subject: [PATCH 06/11] feat: modify the code --- veadk/database/tos/tos_client.py | 50 +++++++++++++++----------------- veadk/runner.py | 3 +- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/veadk/database/tos/tos_client.py b/veadk/database/tos/tos_client.py index 9e6c10a6..a0a022d5 100644 --- a/veadk/database/tos/tos_client.py +++ b/veadk/database/tos/tos_client.py @@ -17,9 +17,9 @@ from veadk.utils.logger import get_logger import tos import asyncio -from typing import Literal, Union +from typing import Union from pydantic import BaseModel, Field -from typing import Optional, Any +from typing import Any logger = get_logger(__name__) @@ -45,21 +45,16 @@ class TOSConfig(BaseModel): class TOSClient(BaseModel): config: TOSConfig = Field(default_factory=TOSConfig) - client: Optional[Any] = Field(default=None, description="TOS client instance") - def __init__(self, **data: Any): - super().__init__(**data) - self.client = self._init() - - def _init(self): - """initialize TOS client""" + def model_post_init(self, __context: Any) -> None: try: - return tos.TosClientV2( + self._client = tos.TosClientV2( self.config.ak, self.config.sk, endpoint=f"tos-{self.config.region}.volces.com", region=self.config.region, ) + logger.info("Connected to TOS successfully.") except Exception as e: logger.error(f"Client initialization failed:{e}") return None @@ -67,12 +62,12 @@ def _init(self): def create_bucket(self) -> bool: """If the bucket does not exist, create it""" try: - self.client.head_bucket(self.config.bucket_name) + self._client.head_bucket(self.config.bucket_name) logger.info(f"Bucket {self.config.bucket_name} already exists") return True except tos.exceptions.TosServerError as e: if e.status_code == 404: - self.client.create_bucket( + self._client.create_bucket( bucket=self.config.bucket_name, storage_class=tos.StorageClassType.Storage_Class_Standard, acl=tos.ACLType.ACL_Private, @@ -87,10 +82,13 @@ def upload( self, object_key: str, data: Union[str, bytes], - data_type: Literal["file", "bytes"], ): - if data_type not in ("file", "bytes"): - error_msg = f"Upload failed: data_type error. Only 'file' and 'bytes' are supported, got {data_type}" + if isinstance(data, str): + data_type = "file" + elif isinstance(data, bytes): + data_type = "bytes" + else: + error_msg = f"Upload failed: data type error. Only str (file path) and bytes are supported, got {type(data)}" logger.error(error_msg) raise ValueError(error_msg) if data_type == "file": @@ -100,30 +98,30 @@ def upload( def _do_upload_bytes(self, object_key: str, bytes: bytes) -> bool: try: - if not self.client: + if not self._client: return False if not self.create_bucket(): return False - - self.client.put_object( + self._client.put_object( bucket=self.config.bucket_name, key=object_key, content=bytes ) + logger.debug(f"Upload success, object_key: {object_key}") return True except Exception as e: logger.error(f"Upload failed: {e}") return False def _do_upload_file(self, object_key: str, file_path: str) -> bool: - client = self._init_tos_client() try: - if not client: + if not self._client: return False - if not self.create_bucket(client, self.config.bucket_name): + if not self.create_bucket(self._client, self.config.bucket_name): return False - client.put_object_from_file( + self._client.put_object_from_file( bucket=self.config.bucket_name, key=object_key, file_path=file_path ) + logger.debug(f"Upload success, object_key: {object_key}") return True except Exception as e: logger.error(f"Upload failed: {e}") @@ -132,7 +130,7 @@ def _do_upload_file(self, object_key: str, file_path: str) -> bool: def download(self, object_key: str, save_path: str) -> bool: """download image from TOS""" try: - object_stream = self.client.get_object(self.config.bucket_name, object_key) + object_stream = self._client.get_object(self.config.bucket_name, object_key) save_dir = os.path.dirname(save_path) if save_dir and not os.path.exists(save_dir): @@ -150,6 +148,6 @@ def download(self, object_key: str, save_path: str) -> bool: return False - def close_client(self): - if self.client: - self.client.close() + def close(self): + if self._client: + self._client.close() diff --git a/veadk/runner.py b/veadk/runner.py index 9130a3ed..063b3e95 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -123,7 +123,8 @@ def _convert_messages(self, messages, session_id) -> list: object_key = self._build_object_key( self.user_id, self.app_name, session_id, messages.media ) - asyncio.create_task(tos_client.upload(object_key, data, "bytes")) + asyncio.create_task(tos_client.upload(object_key, data)) + tos_client.close() messages = [ types.Content( From 1364a8c22335477bb4dc8f59b368f609a7efcd65 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Mon, 25 Aug 2025 14:46:02 +0800 Subject: [PATCH 07/11] feat: modify _build_tos_object_key --- veadk/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/veadk/runner.py b/veadk/runner.py index 063b3e95..ca83520f 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -94,7 +94,7 @@ def __init__( plugins=plugins, ) - def _build_object_key( + def _build_tos_object_key( self, user_id: str, app_name: str, session_id: str, data_path: str ) -> str: """generate TOS object key""" @@ -120,7 +120,7 @@ def _convert_messages(self, messages, session_id) -> list: object_key = messages.media if self.agent.tracers: tos_client = TOSClient() - object_key = self._build_object_key( + object_key = self._build_tos_object_key( self.user_id, self.app_name, session_id, messages.media ) asyncio.create_task(tos_client.upload(object_key, data)) From 808e9b44cc101ed037e8b32709370a421e46b32c Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Tue, 26 Aug 2025 10:31:58 +0800 Subject: [PATCH 08/11] feat: add test_tos.py --- tests/test_tos.py | 108 +++++++++++++++++++++++++++++++ veadk/database/tos/tos_client.py | 2 +- 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 tests/test_tos.py diff --git a/tests/test_tos.py b/tests/test_tos.py new file mode 100644 index 00000000..185cda71 --- /dev/null +++ b/tests/test_tos.py @@ -0,0 +1,108 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock +import veadk.database.tos.tos_client as tos_mod + +# 使用 pytest-asyncio +pytest_plugins = ("pytest_asyncio",) + + +@pytest.fixture +def mock_client(monkeypatch): + fake_client = mock.Mock() + + monkeypatch.setattr(tos_mod.tos, "TosClientV2", lambda *a, **k: fake_client) + + class FakeExceptions: + class TosServerError(Exception): + def __init__(self, msg): + super().__init__(msg) + self.status_code = None + + monkeypatch.setattr(tos_mod.tos, "exceptions", FakeExceptions) + monkeypatch.setattr( + tos_mod.tos, + "StorageClassType", + type("S", (), {"Storage_Class_Standard": "STANDARD"}), + ) + monkeypatch.setattr( + tos_mod.tos, "ACLType", type("A", (), {"ACL_Private": "private"}) + ) + + return fake_client + + +@pytest.fixture +def tos_client(mock_client): + return tos_mod.TOSClient() + + +def test_create_bucket_exists(tos_client, mock_client): + mock_client.head_bucket.return_value = None # head_bucket 正常返回表示存在 + result = tos_client.create_bucket() + assert result is True + mock_client.create_bucket.assert_not_called() + + +def test_create_bucket_not_exists(tos_client, mock_client): + exc = tos_mod.tos.exceptions.TosServerError("not found") + exc.status_code = 404 + mock_client.head_bucket.side_effect = exc + + result = tos_client.create_bucket() + assert result is True + mock_client.create_bucket.assert_called_once() + + +@pytest.mark.asyncio +async def test_upload_bytes_success(tos_client, mock_client): + mock_client.head_bucket.return_value = True + data = b"hello world" + + result = await tos_client.upload("obj-key", data) + assert result is True + mock_client.put_object.assert_called_once() + + +@pytest.mark.asyncio +async def test_upload_file_success(tmp_path, tos_client, mock_client): + mock_client.head_bucket.return_value = True + file_path = tmp_path / "file.txt" + file_path.write_text("hello file") + + result = await tos_client.upload("obj-key", str(file_path)) + assert result is True + mock_client.put_object_from_file.assert_called_once() + + +def test_download_success(tmp_path, tos_client, mock_client): + save_path = tmp_path / "out.txt" + mock_client.get_object.return_value = [b"abc", b"def"] + + result = tos_client.download("obj-key", str(save_path)) + assert result is True + assert save_path.read_bytes() == b"abcdef" + + +def test_download_fail(tos_client, mock_client): + mock_client.get_object.side_effect = Exception("boom") + result = tos_client.download("obj-key", "somewhere.txt") + assert result is False + + +def test_close(tos_client, mock_client): + tos_client.close() + mock_client.close.assert_called_once() diff --git a/veadk/database/tos/tos_client.py b/veadk/database/tos/tos_client.py index a0a022d5..12a30f4f 100644 --- a/veadk/database/tos/tos_client.py +++ b/veadk/database/tos/tos_client.py @@ -115,7 +115,7 @@ def _do_upload_file(self, object_key: str, file_path: str) -> bool: try: if not self._client: return False - if not self.create_bucket(self._client, self.config.bucket_name): + if not self.create_bucket(): return False self._client.put_object_from_file( From 30b61e7d92ed104d0b267db8a48ac7cf0b9b58c6 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Tue, 26 Aug 2025 15:59:29 +0800 Subject: [PATCH 09/11] update code & add try-except in runner --- veadk/runner.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/veadk/runner.py b/veadk/runner.py index 17afd6bd..d6ea92d4 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -98,6 +98,12 @@ def _build_tos_object_key( object_key: str = f"{app_name}-{user_id}-{session_id}/{timestamp}-{file_name}" return object_key + def _upload_to_tos(self, data: Union[str, bytes], object_key: str): + tos_client = TOSClient() + asyncio.create_task(tos_client.upload(object_key, data)) + tos_client.close() + return + def _convert_messages(self, messages, session_id) -> list: if isinstance(messages, str): messages = [types.Content(role="user", parts=[types.Part(text=messages)])] @@ -106,14 +112,14 @@ def _convert_messages(self, messages, session_id) -> list: "The MediaMessage only supports PNG format file for now." ) data = read_png_to_bytes(messages.media) - object_key = messages.media - if self.agent.tracers: - tos_client = TOSClient() + try: object_key = self._build_tos_object_key( self.user_id, self.app_name, session_id, messages.media ) - asyncio.create_task(tos_client.upload(object_key, data)) - tos_client.close() + self._upload_to_tos(data, object_key) + except Exception as e: + logger.error(f"Upload to TOS failed: {e}") + object_key = None messages = [ types.Content( From 28e2a40c5da1711f58a4fbe4f812dce19a7b8cbf Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 29 Aug 2025 17:50:54 +0800 Subject: [PATCH 10/11] add image url in coze --- tests/test_tos.py | 11 ++--- .../ve_tos/ve_tos.py} | 27 ++++++++++++- veadk/runner.py | 40 +++++-------------- .../extractors/llm_attributes_extractors.py | 17 ++++++++ veadk/tracing/telemetry/telemetry.py | 11 +++++ 5 files changed, 66 insertions(+), 40 deletions(-) rename veadk/{database/tos/tos_client.py => integrations/ve_tos/ve_tos.py} (85%) diff --git a/tests/test_tos.py b/tests/test_tos.py index 185cda71..3b6bf8f5 100644 --- a/tests/test_tos.py +++ b/tests/test_tos.py @@ -14,7 +14,7 @@ import pytest from unittest import mock -import veadk.database.tos.tos_client as tos_mod +import veadk.integrations.ve_tos.ve_tos as tos_mod # 使用 pytest-asyncio pytest_plugins = ("pytest_asyncio",) @@ -47,7 +47,7 @@ def __init__(self, msg): @pytest.fixture def tos_client(mock_client): - return tos_mod.TOSClient() + return tos_mod.VeTOS() def test_create_bucket_exists(tos_client, mock_client): @@ -75,6 +75,7 @@ async def test_upload_bytes_success(tos_client, mock_client): result = await tos_client.upload("obj-key", data) assert result is True mock_client.put_object.assert_called_once() + mock_client.close.assert_called_once() @pytest.mark.asyncio @@ -86,6 +87,7 @@ async def test_upload_file_success(tmp_path, tos_client, mock_client): result = await tos_client.upload("obj-key", str(file_path)) assert result is True mock_client.put_object_from_file.assert_called_once() + mock_client.close.assert_called_once() def test_download_success(tmp_path, tos_client, mock_client): @@ -101,8 +103,3 @@ def test_download_fail(tos_client, mock_client): mock_client.get_object.side_effect = Exception("boom") result = tos_client.download("obj-key", "somewhere.txt") assert result is False - - -def test_close(tos_client, mock_client): - tos_client.close() - mock_client.close.assert_called_once() diff --git a/veadk/database/tos/tos_client.py b/veadk/integrations/ve_tos/ve_tos.py similarity index 85% rename from veadk/database/tos/tos_client.py rename to veadk/integrations/ve_tos/ve_tos.py index 12a30f4f..c5d93fb6 100644 --- a/veadk/database/tos/tos_client.py +++ b/veadk/integrations/ve_tos/ve_tos.py @@ -20,6 +20,8 @@ from typing import Union from pydantic import BaseModel, Field from typing import Any +from urllib.parse import urlparse +from datetime import datetime logger = get_logger(__name__) @@ -43,7 +45,7 @@ class TOSConfig(BaseModel): ) -class TOSClient(BaseModel): +class VeTOS(BaseModel): config: TOSConfig = Field(default_factory=TOSConfig) def model_post_init(self, __context: Any) -> None: @@ -78,6 +80,23 @@ def create_bucket(self) -> bool: logger.error(f"Bucket creation failed: {str(e)}") return False + def build_tos_url( + self, user_id: str, app_name: str, session_id: str, data_path: str + ) -> tuple[str, str]: + """generate TOS object key""" + parsed_url = urlparse(data_path) + + if parsed_url.scheme and parsed_url.scheme in ("http", "https", "ftp", "ftps"): + file_name = os.path.basename(parsed_url.path) + else: + file_name = os.path.basename(data_path) + + timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] + object_key: str = f"{app_name}-{user_id}-{session_id}/{timestamp}-{file_name}" + tos_url: str = f"https://{self.config.bucket_name}.tos-{self.config.region}.volces.com/{object_key}" + + return object_key, tos_url + def upload( self, object_key: str, @@ -106,9 +125,11 @@ def _do_upload_bytes(self, object_key: str, bytes: bytes) -> bool: bucket=self.config.bucket_name, key=object_key, content=bytes ) logger.debug(f"Upload success, object_key: {object_key}") + self._close() return True except Exception as e: logger.error(f"Upload failed: {e}") + self._close() return False def _do_upload_file(self, object_key: str, file_path: str) -> bool: @@ -121,10 +142,12 @@ def _do_upload_file(self, object_key: str, file_path: str) -> bool: self._client.put_object_from_file( bucket=self.config.bucket_name, key=object_key, file_path=file_path ) + self._close() logger.debug(f"Upload success, object_key: {object_key}") return True except Exception as e: logger.error(f"Upload failed: {e}") + self._close() return False def download(self, object_key: str, save_path: str) -> bool: @@ -148,6 +171,6 @@ def download(self, object_key: str, save_path: str) -> bool: return False - def close(self): + def _close(self): if self._client: self._client.close() diff --git a/veadk/runner.py b/veadk/runner.py index 4d134af9..54d8442a 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -11,11 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os import asyncio from typing import Union -from urllib.parse import urlparse -from datetime import datetime from google.adk.agents import RunConfig from google.adk.agents.invocation_context import LlmCallsLimitExceededError @@ -35,7 +32,7 @@ from veadk.types import MediaMessage from veadk.utils.logger import get_logger from veadk.utils.misc import read_png_to_bytes -from veadk.database.tos.tos_client import TOSClient +from veadk.integrations.ve_tos.ve_tos import VeTOS logger = get_logger(__name__) @@ -89,27 +86,6 @@ def __init__( plugins=plugins, ) - def _build_tos_object_key( - self, user_id: str, app_name: str, session_id: str, data_path: str - ) -> str: - """generate TOS object key""" - parsed_url = urlparse(data_path) - - if parsed_url.scheme and parsed_url.scheme in ("http", "https", "ftp", "ftps"): - file_name = os.path.basename(parsed_url.path) - else: - file_name = os.path.basename(data_path) - - timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3] - object_key: str = f"{app_name}-{user_id}-{session_id}/{timestamp}-{file_name}" - return object_key - - def _upload_to_tos(self, data: Union[str, bytes], object_key: str): - tos_client = TOSClient() - asyncio.create_task(tos_client.upload(object_key, data)) - tos_client.close() - return - def _convert_messages(self, messages, session_id) -> list: if isinstance(messages, str): messages = [types.Content(role="user", parts=[types.Part(text=messages)])] @@ -118,14 +94,16 @@ def _convert_messages(self, messages, session_id) -> list: "The MediaMessage only supports PNG format file for now." ) data = read_png_to_bytes(messages.media) + + ve_tos = VeTOS() + object_key, tos_url = ve_tos.build_tos_url( + self.user_id, self.app_name, session_id, messages.media + ) try: - object_key = self._build_tos_object_key( - self.user_id, self.app_name, session_id, messages.media - ) - self._upload_to_tos(data, object_key) + asyncio.create_task(ve_tos.upload(object_key, data)) except Exception as e: logger.error(f"Upload to TOS failed: {e}") - object_key = None + tos_url = None messages = [ types.Content( @@ -134,7 +112,7 @@ def _convert_messages(self, messages, session_id) -> list: types.Part(text=messages.text), types.Part( inline_data=Blob( - display_name=object_key, + display_name=tos_url, data=data, mime_type="image/png", ) diff --git a/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py index 379c033a..d8a1141a 100644 --- a/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py +++ b/veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py @@ -137,6 +137,15 @@ def llm_gen_ai_prompt(params: LLMAttributesParams) -> ExtractorResponse: if part.function_call.args else json.dumps({}) ) + # image + if part.inline_data: + message[f"gen_ai.prompt.{idx}.type"] = "image_url" + message[f"gen_ai.prompt.{idx}.image_url.name"] = ( + part.inline_data.display_name.split("/")[-1] + ) + message[f"gen_ai.prompt.{idx}.image_url.url"] = ( + part.inline_data.display_name + ) if message: messages.append(message) @@ -234,6 +243,14 @@ def llm_gen_ai_user_message(params: LLMAttributesParams) -> ExtractorResponse: message_part[f"parts.{idx}.content"] = str( part.function_response ) + if part.inline_data: + message_part[f"parts.{idx}.type"] = "image_url" + message_part[f"parts.{idx}.image_url.name"] = ( + part.inline_data.display_name.split("/")[-1] + ) + message_part[f"parts.{idx}.image_url.url"] = ( + part.inline_data.display_name + ) message_parts.append(message_part) diff --git a/veadk/tracing/telemetry/telemetry.py b/veadk/tracing/telemetry/telemetry.py index 041b31d2..2f732973 100644 --- a/veadk/tracing/telemetry/telemetry.py +++ b/veadk/tracing/telemetry/telemetry.py @@ -87,6 +87,17 @@ def _set_agent_input_attribute( "gen_ai.user.message", {f"parts.{idx}.type": "text", f"parts.{idx}.content": part.text}, ) + if part.inline_data: + span.add_event( + "gen_ai.user.message", + { + f"parts.{idx}.type": "image_url", + f"parts.{idx}.image_url.name": part.inline_data.display_name.split( + "/" + )[-1], + f"parts.{idx}.image_url.url": part.inline_data.display_name, + }, + ) def _set_agent_output_attribute(span: _Span, llm_response: LlmResponse) -> None: From e623c9646743b0a900410c469e23edc9eb71a2a1 Mon Sep 17 00:00:00 2001 From: ZQlQZ Date: Fri, 29 Aug 2025 17:59:45 +0800 Subject: [PATCH 11/11] fix tos --- tests/test_tos.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_tos.py b/tests/test_tos.py index 3b6bf8f5..51595184 100644 --- a/tests/test_tos.py +++ b/tests/test_tos.py @@ -24,6 +24,11 @@ def mock_client(monkeypatch): fake_client = mock.Mock() + monkeypatch.setenv("DATABASE_TOS_REGION", "test-region") + monkeypatch.setenv("VOLCENGINE_ACCESS_KEY", "test-access-key") + monkeypatch.setenv("VOLCENGINE_SECRET_KEY", "test-secret-key") + monkeypatch.setenv("DATABASE_TOS_BUCKET", "test-bucket") + monkeypatch.setattr(tos_mod.tos, "TosClientV2", lambda *a, **k: fake_client) class FakeExceptions: