From 34c70b5ea409d702aa5a922f84a3f2783e8d2c61 Mon Sep 17 00:00:00 2001 From: tangou Date: Thu, 7 Aug 2025 11:14:52 +0800 Subject: [PATCH 1/3] feat(vefaas): vefaas update function --- veadk/cli/services/vefaas/vefaas.py | 124 ++++++++++++++++++++++++++++ veadk/cloud/cloud_agent_engine.py | 64 ++++++++++++++ 2 files changed, 188 insertions(+) diff --git a/veadk/cli/services/vefaas/vefaas.py b/veadk/cli/services/vefaas/vefaas.py index 3902098b..7e5550ae 100644 --- a/veadk/cli/services/vefaas/vefaas.py +++ b/veadk/cli/services/vefaas/vefaas.py @@ -27,6 +27,12 @@ ) import veadk.config +from volcenginesdkvefaas.models.env_for_update_function_input import ( + EnvForUpdateFunctionInput, +) +from volcenginesdkvefaas.models.tag_for_update_function_input import ( + TagForUpdateFunctionInput, +) from veadk.cli.services.veapig.apig import APIGateway from veadk.utils.logger import get_logger from veadk.utils.misc import formatted_timestamp @@ -299,3 +305,121 @@ def deploy( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") return url, app_id, function_id + + def update( + self, + name: str, # application name + path: str, + ) -> tuple[str, str, str]: + """Update existing application function code while preserving URL. + + Args: + name (str): Application name to update. + path (str): Local project path. + + Returns: + tuple[str, str, str]: URL, app_id, function_id + """ + # Naming check + if "_" in name: + raise ValueError("Function or Application name cannot contain '_'.") + + # Find existing application + app_id = self.find_app_id_by_name(name) + if not app_id: + raise ValueError( + f"Application '{name}' not found. Use deploy() for new applications." + ) + + # Get application status and extract function info + status, full_response = self._get_application_status(app_id) + if status == "deploy_fail": + raise ValueError( + f"Cannot update failed application. Current status: {status}" + ) + + # Extract function name from application config + cloud_resource = full_response["Result"]["CloudResource"] + cloud_resource = json.loads(cloud_resource) + function_name = cloud_resource["framework"]["function"]["Name"] + # existing_url = cloud_resource["framework"]["url"]["system_url"] + function_id = cloud_resource["framework"]["function"]["Id"] + if not function_id: + raise ValueError(f"Function '{function_name}' not found for update") + + logger.info( + f"Start to update VeFaaS function {function_name} with path {path}." + ) + + # Update function with new code + self._update_function_code(function_id, path) + + logger.info(f"VeFaaS function {function_name} with ID {function_id} updated.") + + logger.info(f"Start to release VeFaaS application {app_id}.") + + # Release the application to apply changes + url = self._release_application(app_id) + + logger.info(f"VeFaaS application {name} with ID {app_id} released.") + + logger.info(f"VeFaaS application {name} with ID {app_id} updated on {url}.") + + return url, app_id, function_id + + def _update_function_code(self, function_id: str, path: str): + """Update function code by copying deploy process and using update_function. + + Args: + function_id (str): Target function ID to update. + path (str): Local project path. + """ + # 1. Read envs + envs = [] + for key, value in veadk.config.veadk_environments.items(): + envs.append(EnvForUpdateFunctionInput(key=key, value=value)) + logger.info( + f"Fetch {len(envs)} environment variables.", + ) + + # 2. Get a temp bucket to store code + code_zip_data, code_zip_size, error = zip_and_encode_folder(path) + logger.info( + f"Zipped project size: {code_zip_size / 1024 / 1024:.2f} MB", + ) + + # 3. Upload code to VeFaaS temp bucket + req = volcenginesdkvefaas.GetCodeUploadAddressRequest( + function_id=function_id, content_length=code_zip_size + ) + response = self.client.get_code_upload_address(req) + upload_url = response.upload_address + + headers = { + "Content-Type": "application/zip", + } + response = requests.put(url=upload_url, data=code_zip_data, headers=headers) + if not (200 <= response.status_code < 300): + error_message = f"Upload failed to {upload_url} with status code {response.status_code}: {response.text}" + raise ValueError(error_message) + + # 4. Mount the TOS bucket to function instance + _ = signed_request( + ak=self.ak, + sk=self.sk, + target="CodeUploadCallback", + body={"FunctionId": function_id}, + ) + + # 5. Use update_function client method to apply changes + _ = self.client.update_function( + volcenginesdkvefaas.UpdateFunctionRequest( + id=function_id, + description="Updated by VeADK (Volcengine Agent Development Kit)", + tags=[TagForUpdateFunctionInput(key="provider", value="veadk")], + request_timeout=1800, # Keep same timeout as deploy + envs=envs, + ) + ) + + logger.info(f"Function updated successfully: {function_id}") diff --git a/veadk/cloud/cloud_agent_engine.py b/veadk/cloud/cloud_agent_engine.py index 7b85f8ab..1e71a7ef 100644 --- a/veadk/cloud/cloud_agent_engine.py +++ b/veadk/cloud/cloud_agent_engine.py @@ -167,3 +167,67 @@ def remove(self, app_name: str): else: app_id = self._vefaas_service.find_app_id_by_name(app_name) self._vefaas_service.delete(app_id) + + def update( + self, + application_name: str, + path: str, + use_studio: bool = False, + use_adk_web: bool = False, + ) -> CloudApp: + """Update existing agent project code while keeping the same URL. + + Args: + application_name (str): Existing application name to update. + path (str): Local agent project path. + use_studio (bool): Whether to use studio mode. + use_adk_web (bool): Whether to use ADK web mode. + + Returns: + CloudApp: Updated cloud app with same endpoint. + """ + assert not (use_studio and use_adk_web), ( + "use_studio and use_adk_web can not be True at the same time." + ) + + # prevent deepeval writing operations + import veadk.config + + veadk.config.veadk_environments["DEEPEVAL_TELEMETRY_OPT_OUT"] = "YES" + + if use_studio: + veadk.config.veadk_environments["USE_STUDIO"] = "True" + else: + import veadk.config + + veadk.config.veadk_environments["USE_STUDIO"] = "False" + + if use_adk_web: + import veadk.config + + veadk.config.veadk_environments["USE_ADK_WEB"] = "True" + else: + import veadk.config + + veadk.config.veadk_environments["USE_ADK_WEB"] = "False" + + # convert `path` to absolute path + path = str(Path(path).resolve()) + self._prepare(path, application_name) + + try: + vefaas_application_url, app_id, function_id = self._vefaas_service.update( + name=application_name, + path=path, + ) + _ = function_id # for future use + + return CloudApp( + vefaas_application_name=application_name, + vefaas_endpoint=vefaas_application_url, + vefaas_application_id=app_id, + ) + except Exception as e: + raise ValueError( + f"Failed to update agent project on Volcengine FaaS platform. Error: {e}" + ) From 12c1bcd8bb5d876f9452b939dfe9ae95407dde9a Mon Sep 17 00:00:00 2001 From: tangou Date: Thu, 7 Aug 2025 16:49:34 +0800 Subject: [PATCH 2/3] fix(vefaas): fix _update_function_code() without envs; create _upload_and_mount_code() --- veadk/cli/services/vefaas/vefaas.py | 156 +++++++++++----------------- veadk/cloud/cloud_agent_engine.py | 40 ++----- 2 files changed, 66 insertions(+), 130 deletions(-) diff --git a/veadk/cli/services/vefaas/vefaas.py b/veadk/cli/services/vefaas/vefaas.py index 7e5550ae..2964b11d 100644 --- a/veadk/cli/services/vefaas/vefaas.py +++ b/veadk/cli/services/vefaas/vefaas.py @@ -27,9 +27,6 @@ ) import veadk.config -from volcenginesdkvefaas.models.env_for_update_function_input import ( - EnvForUpdateFunctionInput, -) from volcenginesdkvefaas.models.tag_for_update_function_input import ( TagForUpdateFunctionInput, ) @@ -65,36 +62,20 @@ def __init__(self, access_key: str, secret_key: str, region: str = "cn-beijing") self.template_id = "6874f3360bdbc40008ecf8c7" - def _create_function(self, function_name: str, path: str): - # 1. Read envs - envs = [] - for key, value in veadk.config.veadk_environments.items(): - envs.append(EnvForCreateFunctionInput(key=key, value=value)) - logger.info( - f"Fetch {len(envs)} environment variables.", - ) + def _upload_and_mount_code(self, function_id: str, path: str): + """Upload code to VeFaaS temp bucket and mount to function instance. - # 2. Create function - res = self.client.create_function( - volcenginesdkvefaas.CreateFunctionRequest( - command="./run.sh", - name=function_name, - description="Created by VeADK (Volcengine Agent Development Kit)", - tags=[TagForCreateFunctionInput(key="provider", value="veadk")], - runtime="native-python3.10/v1", - request_timeout=1800, - envs=envs, - ) - ) - function_id = res.id - - # 3. Get a temp bucket to store code + Args: + function_id (str): Target function ID. + path (str): Local project path. + """ + # Get zipped code data code_zip_data, code_zip_size, error = zip_and_encode_folder(path) logger.info( f"Zipped project size: {code_zip_size / 1024 / 1024:.2f} MB", ) - # 4. Upload code to VeFaaS temp bucket + # Upload code to VeFaaS temp bucket req = volcenginesdkvefaas.GetCodeUploadAddressRequest( function_id=function_id, content_length=code_zip_size ) @@ -109,7 +90,7 @@ def _create_function(self, function_name: str, path: str): error_message = f"Upload failed to {upload_url} with status code {response.status_code}: {response.text}" raise ValueError(error_message) - # 5. Mount the TOS bucket to function instance + # Mount the TOS bucket to function instance res = signed_request( ak=self.ak, sk=self.sk, @@ -117,6 +98,34 @@ def _create_function(self, function_name: str, path: str): body={"FunctionId": function_id}, ) + return res + + def _create_function(self, function_name: str, path: str): + # Read envs + envs = [] + for key, value in veadk.config.veadk_environments.items(): + envs.append(EnvForCreateFunctionInput(key=key, value=value)) + logger.info( + f"Fetch {len(envs)} environment variables.", + ) + + # Create function + res = self.client.create_function( + volcenginesdkvefaas.CreateFunctionRequest( + command="./run.sh", + name=function_name, + description="Created by VeADK (Volcengine Agent Development Kit)", + tags=[TagForCreateFunctionInput(key="provider", value="veadk")], + runtime="native-python3.10/v1", + request_timeout=1800, + envs=envs, + ) + ) + function_id = res.id + + # Upload and mount code using extracted method + self._upload_and_mount_code(function_id, path) + return function_name, function_id def _create_application( @@ -306,29 +315,29 @@ def deploy( return url, app_id, function_id - def update( + def _update_function_code( self, - name: str, # application name + application_name: str, # application name path: str, ) -> tuple[str, str, str]: """Update existing application function code while preserving URL. Args: - name (str): Application name to update. + application_name (str): Application name to update. path (str): Local project path. Returns: tuple[str, str, str]: URL, app_id, function_id """ # Naming check - if "_" in name: + if "_" in application_name: raise ValueError("Function or Application name cannot contain '_'.") # Find existing application - app_id = self.find_app_id_by_name(name) + app_id = self.find_app_id_by_name(application_name) if not app_id: raise ValueError( - f"Application '{name}' not found. Use deploy() for new applications." + f"Application '{application_name}' not found. Use deploy() for new applications." ) # Get application status and extract function info @@ -351,75 +360,30 @@ def update( f"Start to update VeFaaS function {function_name} with path {path}." ) - # Update function with new code - self._update_function_code(function_id, path) + # Upload and mount code using extracted method + self._upload_and_mount_code(function_id, path) - logger.info(f"VeFaaS function {function_name} with ID {function_id} updated.") - - logger.info(f"Start to release VeFaaS application {app_id}.") - - # Release the application to apply changes - url = self._release_application(app_id) - - logger.info(f"VeFaaS application {name} with ID {app_id} released.") - - logger.info(f"VeFaaS application {name} with ID {app_id} updated on {url}.") - - return url, app_id, function_id - - def _update_function_code(self, function_id: str, path: str): - """Update function code by copying deploy process and using update_function. - - Args: - function_id (str): Target function ID to update. - path (str): Local project path. - """ - # 1. Read envs - envs = [] - for key, value in veadk.config.veadk_environments.items(): - envs.append(EnvForUpdateFunctionInput(key=key, value=value)) - logger.info( - f"Fetch {len(envs)} environment variables.", - ) - - # 2. Get a temp bucket to store code - code_zip_data, code_zip_size, error = zip_and_encode_folder(path) - logger.info( - f"Zipped project size: {code_zip_size / 1024 / 1024:.2f} MB", - ) - - # 3. Upload code to VeFaaS temp bucket - req = volcenginesdkvefaas.GetCodeUploadAddressRequest( - function_id=function_id, content_length=code_zip_size - ) - response = self.client.get_code_upload_address(req) - upload_url = response.upload_address - - headers = { - "Content-Type": "application/zip", - } - response = requests.put(url=upload_url, data=code_zip_data, headers=headers) - if not (200 <= response.status_code < 300): - error_message = f"Upload failed to {upload_url} with status code {response.status_code}: {response.text}" - raise ValueError(error_message) - - # 4. Mount the TOS bucket to function instance - _ = signed_request( - ak=self.ak, - sk=self.sk, - target="CodeUploadCallback", - body={"FunctionId": function_id}, - ) - - # 5. Use update_function client method to apply changes - _ = self.client.update_function( + # Use update_function client method to apply changes + self.client.update_function( volcenginesdkvefaas.UpdateFunctionRequest( id=function_id, description="Updated by VeADK (Volcengine Agent Development Kit)", tags=[TagForUpdateFunctionInput(key="provider", value="veadk")], request_timeout=1800, # Keep same timeout as deploy - envs=envs, ) ) logger.info(f"Function updated successfully: {function_id}") + + logger.info(f"VeFaaS function {function_name} with ID {function_id} updated.") + + # Release the application to apply changes + url = self._release_application(app_id) + + logger.info(f"VeFaaS application {application_name} with ID {app_id} released.") + + logger.info( + f"VeFaaS application {application_name} with ID {app_id} updated on {url}." + ) + + return url, app_id, function_id diff --git a/veadk/cloud/cloud_agent_engine.py b/veadk/cloud/cloud_agent_engine.py index 1e71a7ef..17c48f8d 100644 --- a/veadk/cloud/cloud_agent_engine.py +++ b/veadk/cloud/cloud_agent_engine.py @@ -168,59 +168,31 @@ def remove(self, app_name: str): app_id = self._vefaas_service.find_app_id_by_name(app_name) self._vefaas_service.delete(app_id) - def update( + def update_function_code( self, application_name: str, path: str, - use_studio: bool = False, - use_adk_web: bool = False, ) -> CloudApp: """Update existing agent project code while keeping the same URL. Args: application_name (str): Existing application name to update. path (str): Local agent project path. - use_studio (bool): Whether to use studio mode. - use_adk_web (bool): Whether to use ADK web mode. Returns: CloudApp: Updated cloud app with same endpoint. """ - assert not (use_studio and use_adk_web), ( - "use_studio and use_adk_web can not be True at the same time." - ) - - # prevent deepeval writing operations - import veadk.config - - veadk.config.veadk_environments["DEEPEVAL_TELEMETRY_OPT_OUT"] = "YES" - - if use_studio: - veadk.config.veadk_environments["USE_STUDIO"] = "True" - else: - import veadk.config - - veadk.config.veadk_environments["USE_STUDIO"] = "False" - - if use_adk_web: - import veadk.config - - veadk.config.veadk_environments["USE_ADK_WEB"] = "True" - else: - import veadk.config - - veadk.config.veadk_environments["USE_ADK_WEB"] = "False" - # convert `path` to absolute path path = str(Path(path).resolve()) self._prepare(path, application_name) try: - vefaas_application_url, app_id, function_id = self._vefaas_service.update( - name=application_name, - path=path, + vefaas_application_url, app_id, function_id = ( + self._vefaas_service._update_function_code( + application_name=application_name, + path=path, + ) ) - _ = function_id # for future use return CloudApp( vefaas_application_name=application_name, From 537f21f4046c5c52f8d89d396edbf372f41c23fd Mon Sep 17 00:00:00 2001 From: tangou Date: Thu, 7 Aug 2025 17:18:29 +0800 Subject: [PATCH 3/3] fix(vefaas): adjust _update_function_code position --- veadk/cli/services/vefaas/vefaas.py | 147 ++++++++++++++-------------- 1 file changed, 71 insertions(+), 76 deletions(-) diff --git a/veadk/cli/services/vefaas/vefaas.py b/veadk/cli/services/vefaas/vefaas.py index 2964b11d..f0d61c15 100644 --- a/veadk/cli/services/vefaas/vefaas.py +++ b/veadk/cli/services/vefaas/vefaas.py @@ -27,9 +27,6 @@ ) import veadk.config -from volcenginesdkvefaas.models.tag_for_update_function_input import ( - TagForUpdateFunctionInput, -) from veadk.cli.services.veapig.apig import APIGateway from veadk.utils.logger import get_logger from veadk.utils.misc import formatted_timestamp @@ -217,6 +214,77 @@ def _list_application(self): ) return response["Result"]["Items"] + def _update_function_code( + self, + application_name: str, # application name + path: str, + ) -> tuple[str, str, str]: + """Update existing application function code while preserving URL. + + Args: + application_name (str): Application name to update. + path (str): Local project path. + + Returns: + tuple[str, str, str]: URL, app_id, function_id + """ + # Naming check + if "_" in application_name: + raise ValueError("Function or Application name cannot contain '_'.") + + # Find existing application + app_id = self.find_app_id_by_name(application_name) + if not app_id: + raise ValueError( + f"Application '{application_name}' not found. Use deploy() for new applications." + ) + + # Get application status and extract function info + status, full_response = self._get_application_status(app_id) + if status == "deploy_fail": + raise ValueError( + f"Cannot update failed application. Current status: {status}" + ) + + # Extract function name from application config + cloud_resource = full_response["Result"]["CloudResource"] + cloud_resource = json.loads(cloud_resource) + function_name = cloud_resource["framework"]["function"]["Name"] + # existing_url = cloud_resource["framework"]["url"]["system_url"] + function_id = cloud_resource["framework"]["function"]["Id"] + if not function_id: + raise ValueError(f"Function '{function_name}' not found for update") + + logger.info( + f"Start to update VeFaaS function {function_name} with path {path}." + ) + + # Upload and mount code using extracted method + self._upload_and_mount_code(function_id, path) + + # Use update_function client method to apply changes + self.client.update_function( + volcenginesdkvefaas.UpdateFunctionRequest( + id=function_id, + request_timeout=1800, # Keep same timeout as deploy + ) + ) + + logger.info(f"Function updated successfully: {function_id}") + + logger.info(f"VeFaaS function {function_name} with ID {function_id} updated.") + + # Release the application to apply changes + url = self._release_application(app_id) + + logger.info(f"VeFaaS application {application_name} with ID {app_id} released.") + + logger.info( + f"VeFaaS application {application_name} with ID {app_id} updated on {url}." + ) + + return url, app_id, function_id + def find_app_id_by_name(self, name: str): apps = self._list_application() for app in apps: @@ -314,76 +382,3 @@ def deploy( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") return url, app_id, function_id - - def _update_function_code( - self, - application_name: str, # application name - path: str, - ) -> tuple[str, str, str]: - """Update existing application function code while preserving URL. - - Args: - application_name (str): Application name to update. - path (str): Local project path. - - Returns: - tuple[str, str, str]: URL, app_id, function_id - """ - # Naming check - if "_" in application_name: - raise ValueError("Function or Application name cannot contain '_'.") - - # Find existing application - app_id = self.find_app_id_by_name(application_name) - if not app_id: - raise ValueError( - f"Application '{application_name}' not found. Use deploy() for new applications." - ) - - # Get application status and extract function info - status, full_response = self._get_application_status(app_id) - if status == "deploy_fail": - raise ValueError( - f"Cannot update failed application. Current status: {status}" - ) - - # Extract function name from application config - cloud_resource = full_response["Result"]["CloudResource"] - cloud_resource = json.loads(cloud_resource) - function_name = cloud_resource["framework"]["function"]["Name"] - # existing_url = cloud_resource["framework"]["url"]["system_url"] - function_id = cloud_resource["framework"]["function"]["Id"] - if not function_id: - raise ValueError(f"Function '{function_name}' not found for update") - - logger.info( - f"Start to update VeFaaS function {function_name} with path {path}." - ) - - # Upload and mount code using extracted method - self._upload_and_mount_code(function_id, path) - - # Use update_function client method to apply changes - self.client.update_function( - volcenginesdkvefaas.UpdateFunctionRequest( - id=function_id, - description="Updated by VeADK (Volcengine Agent Development Kit)", - tags=[TagForUpdateFunctionInput(key="provider", value="veadk")], - request_timeout=1800, # Keep same timeout as deploy - ) - ) - - logger.info(f"Function updated successfully: {function_id}") - - logger.info(f"VeFaaS function {function_name} with ID {function_id} updated.") - - # Release the application to apply changes - url = self._release_application(app_id) - - logger.info(f"VeFaaS application {application_name} with ID {app_id} released.") - - logger.info( - f"VeFaaS application {application_name} with ID {app_id} updated on {url}." - ) - - return url, app_id, function_id