diff --git a/veadk/integrations/ve_tos/ve_tos.py b/veadk/integrations/ve_tos/ve_tos.py index 356f7dd7..c28ae161 100644 --- a/veadk/integrations/ve_tos/ve_tos.py +++ b/veadk/integrations/ve_tos/ve_tos.py @@ -705,6 +705,80 @@ def download(self, bucket_name: str, object_key: str, save_path: str) -> bool: logger.error(f"Image download failed: {str(e)}") return False + def download_directory( + self, bucket_name: str, prefix: str, local_dir: str = "/tmp" + ) -> bool: + """Download entire directory from TOS bucket to local directory + + Args: + bucket_name: TOS bucket name + prefix: Directory prefix in TOS (e.g., "skills/pdf/") + local_dir: Local directory path to save files + + Returns: + bool: True if download succeeds, False otherwise + """ + bucket_name = self._check_bucket_name(bucket_name) + + if not self._client: + logger.error("TOS client is not initialized") + return False + + try: + # Ensure prefix ends with / + if prefix and not prefix.endswith("/"): + prefix += "/" + + # Create local directory if not exists + os.makedirs(local_dir, exist_ok=True) + + # List all objects with the prefix + is_truncated = True + next_continuation_token = "" + downloaded_count = 0 + + while is_truncated: + out = self._client.list_objects_type2( + bucket_name, + prefix=prefix, + continuation_token=next_continuation_token, + ) + is_truncated = out.is_truncated + next_continuation_token = out.next_continuation_token + + # Download each object + for content in out.contents: + object_key = content.key + + # Skip directory markers (objects ending with /) + if object_key.endswith("/"): + continue + + # Calculate relative path and local file path + relative_path = object_key[len(prefix) :] + local_file_path = os.path.join(local_dir, relative_path) + + # Create subdirectories if needed + local_file_dir = os.path.dirname(local_file_path) + if local_file_dir: + os.makedirs(local_file_dir, exist_ok=True) + + # Download the file + if self.download(bucket_name, object_key, local_file_path): + downloaded_count += 1 + logger.debug(f"Downloaded: {object_key} -> {local_file_path}") + else: + logger.warning(f"Failed to download: {object_key}") + + logger.info( + f"Downloaded {downloaded_count} files from {bucket_name}/{prefix} to {local_dir}" + ) + return downloaded_count > 0 + + except Exception as e: + logger.error(f"Failed to download directory: {str(e)}") + return False + def close(self): if self._client: self._client.close() diff --git a/veadk/tools/builtin_tools/execute_skills.py b/veadk/tools/builtin_tools/execute_skills.py new file mode 100644 index 00000000..799f85fe --- /dev/null +++ b/veadk/tools/builtin_tools/execute_skills.py @@ -0,0 +1,149 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from typing import Optional, List + +from google.adk.tools import ToolContext + +from veadk.config import getenv +from veadk.utils.logger import get_logger +from veadk.utils.volcengine_sign import ve_request +from veadk.auth.veauth.utils import get_credential_from_vefaas_iam + +logger = get_logger(__name__) + + +def execute_skills( + workflow_prompt: str, + skills: Optional[List[str]] = None, + tool_context: ToolContext = None, + timeout: int = 300, +) -> str: + """execute skills in a code sandbox and return the output. + For C++ code, don't execute it directly, compile and execute via Python; write sources and object files to /tmp. + + Args: + workflow_prompt (str): instruction of workflow + skills (Optional[List[str]]): The skills will be invoked + timeout (int, optional): The timeout in seconds for the code execution, less than or equal to 300. Defaults to 300. + + Returns: + str: The output of the code execution. + """ + + tool_id = getenv("AGENTKIT_TOOL_ID") + + service = getenv( + "AGENTKIT_TOOL_SERVICE_CODE", "agentkit" + ) # temporary service for code run tool + region = getenv("AGENTKIT_TOOL_REGION", "cn-beijing") + host = getenv( + "AGENTKIT_TOOL_HOST", service + "." + region + ".volces.com" + ) # temporary host for code run tool + logger.debug(f"tools endpoint: {host}") + + session_id = tool_context._invocation_context.session.id + agent_name = tool_context._invocation_context.agent.name + user_id = tool_context._invocation_context.user_id + tool_user_session_id = agent_name + "_" + user_id + "_" + session_id + logger.debug(f"tool_user_session_id: {tool_user_session_id}") + + logger.debug( + f"Execute skills in session_id={session_id}, tool_id={tool_id}, host={host}, service={service}, region={region}, timeout={timeout}" + ) + + ak = getenv("VOLCENGINE_ACCESS_KEY") + sk = getenv("VOLCENGINE_SECRET_KEY") + header = {} + + if not (ak and sk): + logger.debug("Get AK/SK from tool context failed.") + ak = os.getenv("VOLCENGINE_ACCESS_KEY") + sk = os.getenv("VOLCENGINE_SECRET_KEY") + if not (ak and sk): + logger.debug( + "Get AK/SK from environment variables failed. Try to use credential from Iam." + ) + credential = get_credential_from_vefaas_iam() + ak = credential.access_key_id + sk = credential.secret_access_key + header = {"X-Security-Token": credential.session_token} + else: + logger.debug("Successfully get AK/SK from environment variables.") + else: + logger.debug("Successfully get AK/SK from tool context.") + + cmd = ["python", "agent.py", workflow_prompt] + if skills: + cmd.extend(["--skills"] + skills) + + # TODO: remove after agentkit supports custom environment variables setting + env_vars = { + "MODEL_AGENT_API_KEY": os.getenv("MODEL_AGENT_API_KEY"), + "TOS_SKILLS_DIR": os.getenv("TOS_SKILLS_DIR"), + } + + code = f""" +import subprocess +import os + +env = os.environ.copy() +for key, value in {env_vars!r}.items(): + if key not in env: + env[key] = value + +result = subprocess.run( + {cmd!r}, + cwd='/home/gem/veadk_skills', + capture_output=True, + text=True, + env=env, + timeout={timeout - 10}, +) +print(result.stdout) +if result.stderr: + print(result.stderr) + """ + + res = ve_request( + request_body={ + "ToolId": tool_id, + "UserSessionId": tool_user_session_id, + "OperationType": "RunCode", + "OperationPayload": json.dumps( + { + "code": code, + "timeout": timeout, + "kernel_name": "python3", + } + ), + }, + action="InvokeTool", + ak=ak, + sk=sk, + service=service, + version="2025-10-30", + region=region, + host=host, + header=header, + ) + logger.debug(f"Invoke run code response: {res}") + + try: + return res["Result"]["Result"] + except KeyError as e: + logger.error(f"Error occurred while running code: {e}, response is {res}") + return res