From 18d778f4b225b8ce8c693f19e6dab14f3ceba93b Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 3 Sep 2025 18:20:33 +0800 Subject: [PATCH 1/3] feat: add delete_doc and list_docs --- veadk/database/database_adapter.py | 51 +++++++++++++++ veadk/database/local_database.py | 23 +++++-- .../vector/opensearch_vector_database.py | 8 ++- veadk/database/viking/viking_database.py | 65 +++++++++++++++++++ veadk/knowledgebase/knowledgebase.py | 8 +++ 5 files changed, 148 insertions(+), 7 deletions(-) diff --git a/veadk/database/database_adapter.py b/veadk/database/database_adapter.py index 49b60f29..4002a6c6 100644 --- a/veadk/database/database_adapter.py +++ b/veadk/database/database_adapter.py @@ -54,6 +54,12 @@ def query(self, query: str, index: str, top_k: int = 0) -> list: logger.error(f"Failed to search from Redis: index={index} error={e}") raise e + def delete_docs(self, index: str, ids: list[int]): ... + + def list_docs( + self, index: str, offset: int = 0, limit: int = 100 + ) -> list[dict]: ... + class RelationalDatabaseAdapter: def __init__(self, client): @@ -108,6 +114,12 @@ def query(self, query: str, index: str, top_k: int) -> list[str]: return [item["data"] for item in results] + def delete_docs(self, index: str, ids: list[int]): ... + + def list_docs( + self, index: str, offset: int = 0, limit: int = 100 + ) -> list[dict]: ... + class VectorDatabaseAdapter: def __init__(self, client): @@ -152,6 +164,23 @@ def query(self, query: str, index: str, top_k: int) -> list[str]: top_k=top_k, ) + def delete_doc(self, index: str, id: str) -> bool: + self._validate_index(index) + logger.debug(f"Deleting documents from vector database: index={index} id={id}") + try: + self.client.delete_by_id(collection_name=index, id=id) + return True + except Exception as e: + logger.error( + f"Failed to delete document from vector database: index={index} id={id} error={e}" + ) + return False + + def list_docs(self, index: str, offset: int = 0, limit: int = 1000) -> list[dict]: + self._validate_index(index) + logger.debug(f"Listing documents from vector database: index={index}") + return self.client.list_docs(collection_name=index, offset=offset, limit=limit) + class VikingDatabaseAdapter: def __init__(self, client): @@ -212,6 +241,16 @@ def query(self, query: str, index: str, top_k: int) -> list[str]: return self.client.query(query, collection_name=index, top_k=top_k) + def delete_doc(self, index: str, id: str) -> bool: + self._validate_index(index) + logger.debug(f"Deleting documents from vector database: index={index} id={id}") + return self.client.delete_by_id(collection_name=index, id=id) + + def list_docs(self, index: str, offset: int, limit: int) -> list[dict]: + self._validate_index(index) + logger.debug(f"Listing documents from vector database: index={index}") + return self.client.list_docs(collection_name=index, offset=offset, limit=limit) + class VikingMemoryDatabaseAdapter: def __init__(self, client): @@ -248,6 +287,12 @@ def query(self, query: str, index: str, top_k: int, **kwargs): result = self.client.query(query, collection_name=index, top_k=top_k, **kwargs) return result + def delete_docs(self, index: str, ids: list[int]): + raise NotImplementedError("VikingMemoryDatabase does not support delete_docs") + + def list_docs(self, index: str): + raise NotImplementedError("VikingMemoryDatabase does not support list_docs") + class LocalDatabaseAdapter: def __init__(self, client): @@ -261,6 +306,12 @@ def add(self, data: list[str], **kwargs): def query(self, query: str, **kwargs): return self.client.query(query, **kwargs) + def delete_doc(self, index: str, id: str) -> bool: + return self.client.delete_doc(id) + + def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]: + return self.client.list_docs(offset=offset, limit=limit) + MAPPING = { "RedisDatabase": KVDatabaseAdapter, diff --git a/veadk/database/local_database.py b/veadk/database/local_database.py index da80c748..f0cdd45e 100644 --- a/veadk/database/local_database.py +++ b/veadk/database/local_database.py @@ -24,20 +24,35 @@ class LocalDataBase(BaseDatabase): def __init__(self, **kwargs): super().__init__() - self.data = [] + self.data = {} # 改为字典 self._type = "local" + self._next_id = 0 # 用于生成唯一ID def add_texts(self, texts: list[str], **kwargs): - self.data.extend(texts) + for text in texts: + self.data[str(self._next_id)] = text + self._next_id += 1 def is_empty(self): return len(self.data) == 0 def query(self, query: str, **kwargs: Any) -> list[str]: - return self.data + return list(self.data.values()) def delete(self, **kwargs: Any): - self.data = [] + self.data = {} def add(self, texts: list[str], **kwargs: Any): return self.add_texts(texts) + + def list_docs(self, **kwargs: Any) -> list[dict]: + return [{"id": id, "content": content} for id, content in self.data.items()] + + def delete_doc(self, id: str, **kwargs: Any): + if id not in self.data: + raise ValueError(f"id {id} not found") + try: + del self.data[id] + return True + except Exception: + return False diff --git a/veadk/database/vector/opensearch_vector_database.py b/veadk/database/vector/opensearch_vector_database.py index f3e45110..68061e2d 100644 --- a/veadk/database/vector/opensearch_vector_database.py +++ b/veadk/database/vector/opensearch_vector_database.py @@ -219,7 +219,9 @@ def list_all_collection(self) -> list: response = self._opensearch_client.indices.get_alias() return list(response.keys()) - def get_all_docs(self, collection_name: str, size: int = 10000) -> list[dict]: + def list_docs( + self, collection_name: str, offset: int = 0, limit: int = 10000 + ) -> list[dict]: """Match all docs in one index of OpenSearch""" if not self.collection_exists(collection_name): logger.warning( @@ -227,12 +229,12 @@ def get_all_docs(self, collection_name: str, size: int = 10000) -> list[dict]: ) return [] - query = {"size": size, "query": {"match_all": {}}} + query = {"size": limit, "from": offset, "query": {"match_all": {}}} response = self._opensearch_client.search(index=collection_name, body=query) return [ { "id": hit["_id"], - "page_content": hit["_source"]["page_content"], + "content": hit["_source"]["page_content"], } for hit in response["hits"]["hits"] ] diff --git a/veadk/database/viking/viking_database.py b/veadk/database/viking/viking_database.py index 25a7039c..61a3048a 100644 --- a/veadk/database/viking/viking_database.py +++ b/veadk/database/viking/viking_database.py @@ -41,6 +41,8 @@ doc_add_path = "/api/knowledge/doc/add" doc_info_path = "/api/knowledge/doc/info" doc_del_path = "/api/collection/drop" +list_docs_path = "/api/knowledge/point/list" +delete_docs_path = "/api/knowledge/point/delete" class VolcengineTOSConfig(BaseModel): @@ -400,3 +402,66 @@ def collection_exists(self, collection_name: str) -> bool: return True else: return False + + def list_docs( + self, collection_name: str, offset: int = 0, limit: int = -1 + ) -> list[dict]: + request_params = { + "collection_name": collection_name, + "project": self.config.project, + "offset": offset, + "limit": limit, + } + + create_collection_req = prepare_request( + method="POST", + path=list_docs_path, + config=self.config, + data=request_params, + ) + resp = requests.request( + method=create_collection_req.method, + url="https://{}{}".format( + g_knowledge_base_domain, create_collection_req.path + ), + headers=create_collection_req.headers, + data=create_collection_req.body, + ) + + result = resp.json() + if result["code"] != 0: + logger.error(f"Error in list_docs: {result['message']}") + raise ValueError(f"Error in list_docs: {result['message']}") + + data = [ + {"id": res["point_id"], "content": res["content"]} + for res in result["data"]["point_list"] + ] + return data + + def delete_by_id(self, collection_name: str, id: str) -> bool: + request_params = { + "collection_name": collection_name, + "project": self.config.project, + "point_id": id, + } + + create_collection_req = prepare_request( + method="POST", + path=delete_docs_path, + config=self.config, + data=request_params, + ) + resp = requests.request( + method=create_collection_req.method, + url="https://{}{}".format( + g_knowledge_base_domain, create_collection_req.path + ), + headers=create_collection_req.headers, + data=create_collection_req.body, + ) + + result = resp.json() + if result["code"] != 0: + return False + return True diff --git a/veadk/knowledgebase/knowledgebase.py b/veadk/knowledgebase/knowledgebase.py index f9e92556..10955075 100644 --- a/veadk/knowledgebase/knowledgebase.py +++ b/veadk/knowledgebase/knowledgebase.py @@ -80,3 +80,11 @@ def search(self, query: str, app_name: str, top_k: int | None = None) -> list[st if len(result) == 0: logger.warning(f"No documents found in knowledgebase. Query: {query}") return result + + def delete_doc(self, app_name: str, id: str) -> bool: + index = build_knowledgebase_index(app_name) + return self.adapter.delete_doc(index=index, id=id) + + def list_docs(self, app_name: str, offset: int = 0, limit: int = 100) -> list[dict]: + index = build_knowledgebase_index(app_name) + return self.adapter.list_docs(index=index, offset=offset, limit=limit) From a8f44a9a69cd6a643f69107c422302bd835bf394 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 3 Sep 2025 18:31:50 +0800 Subject: [PATCH 2/3] fix: remove redundant characters --- veadk/database/local_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/veadk/database/local_database.py b/veadk/database/local_database.py index f0cdd45e..172753a6 100644 --- a/veadk/database/local_database.py +++ b/veadk/database/local_database.py @@ -24,9 +24,9 @@ class LocalDataBase(BaseDatabase): def __init__(self, **kwargs): super().__init__() - self.data = {} # 改为字典 + self.data = {} self._type = "local" - self._next_id = 0 # 用于生成唯一ID + self._next_id = 0 # Used to generate unique IDs def add_texts(self, texts: list[str], **kwargs): for text in texts: From b2f5a3c8a225c5230e2bba106ceb61f6c8ef0f62 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Thu, 4 Sep 2025 10:42:55 +0800 Subject: [PATCH 3/3] feat: add mysql and redis part --- veadk/database/database_adapter.py | 53 ++++++++++++++++--- veadk/database/kv/redis_database.py | 44 ++++++++++++++++ veadk/database/relational/mysql_database.py | 57 +++++++++++++++++++++ 3 files changed, 146 insertions(+), 8 deletions(-) diff --git a/veadk/database/database_adapter.py b/veadk/database/database_adapter.py index 4002a6c6..50084e9c 100644 --- a/veadk/database/database_adapter.py +++ b/veadk/database/database_adapter.py @@ -54,11 +54,32 @@ def query(self, query: str, index: str, top_k: int = 0) -> list: logger.error(f"Failed to search from Redis: index={index} error={e}") raise e - def delete_docs(self, index: str, ids: list[int]): ... + def delete_doc(self, index: str, id: str) -> bool: + logger.debug(f"Deleting document from Redis database: index={index} id={id}") + try: + # For Redis, we need to handle deletion differently since RedisDatabase.delete_doc + # takes a key and a single id + result = self.client.delete_doc(key=index, id=id) + return result + except Exception as e: + logger.error( + f"Failed to delete document from Redis database: index={index} id={id} error={e}" + ) + return False - def list_docs( - self, index: str, offset: int = 0, limit: int = 100 - ) -> list[dict]: ... + def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]: + logger.debug(f"Listing documents from Redis database: index={index}") + try: + # Get all documents from Redis + docs = self.client.list_docs(key=index) + + # Apply offset and limit for pagination + return docs[offset : offset + limit] + except Exception as e: + logger.error( + f"Failed to list documents from Redis database: index={index} error={e}" + ) + return [] class RelationalDatabaseAdapter: @@ -114,11 +135,27 @@ def query(self, query: str, index: str, top_k: int) -> list[str]: return [item["data"] for item in results] - def delete_docs(self, index: str, ids: list[int]): ... + def delete_doc(self, index: str, id: str) -> bool: + logger.debug(f"Deleting document from SQL database: table_name={index} id={id}") + try: + # Convert single id to list for the client method + result = self.client.delete_doc(table=index, ids=[int(id)]) + return result + except Exception as e: + logger.error( + f"Failed to delete document from SQL database: table_name={index} id={id} error={e}" + ) + return False - def list_docs( - self, index: str, offset: int = 0, limit: int = 100 - ) -> list[dict]: ... + def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]: + logger.debug(f"Listing documents from SQL database: table_name={index}") + try: + return self.client.list_docs(table=index, offset=offset, limit=limit) + except Exception as e: + logger.error( + f"Failed to list documents from SQL database: table_name={index} error={e}" + ) + return [] class VectorDatabaseAdapter: diff --git a/veadk/database/kv/redis_database.py b/veadk/database/kv/redis_database.py index c0aa2bef..1173615c 100644 --- a/veadk/database/kv/redis_database.py +++ b/veadk/database/kv/redis_database.py @@ -110,3 +110,47 @@ def delete(self, **kwargs): except Exception as e: logger.error(f"Failed to delete key `{key}`: {e}") raise e + + def delete_doc(self, key: str, id: str) -> bool: + """Delete a specific document by ID from a Redis list. + + Args: + key: The Redis key (list) to delete from + id: The ID of the document to delete + + Returns: + bool: True if deletion was successful, False otherwise + """ + try: + # Get all items in the list + items = self._client.lrange(key, 0, -1) + + # Find the index of the item to delete + for i, item in enumerate(items): + # Assuming the item is stored as a JSON string with an 'id' field + # If it's just the content, we'll use the list index as ID + if str(i) == id: + self._client.lrem(key, 1, item) + return True + + logger.warning(f"Document with id {id} not found in key {key}") + return False + except Exception as e: + logger.error(f"Failed to delete document with id {id} from key {key}: {e}") + return False + + def list_docs(self, key: str) -> list[dict]: + """List all documents in a Redis list. + + Args: + key: The Redis key (list) to list documents from + + Returns: + list[dict]: List of documents with id and content + """ + try: + items = self._client.lrange(key, 0, -1) + return [{"id": str(i), "content": item} for i, item in enumerate(items)] + except Exception as e: + logger.error(f"Failed to list documents from key {key}: {e}") + return [] diff --git a/veadk/database/relational/mysql_database.py b/veadk/database/relational/mysql_database.py index 3ca1beb8..9b9eda5c 100644 --- a/veadk/database/relational/mysql_database.py +++ b/veadk/database/relational/mysql_database.py @@ -111,5 +111,62 @@ def delete(self, **kwargs): logger.error(f"Failed to drop table {table}: {e}") raise e + def delete_doc(self, table: str, ids: list[int]) -> bool: + """Delete documents by IDs from a MySQL table. + + Args: + table: The table name to delete from + ids: List of document IDs to delete + + Returns: + bool: True if deletion was successful, False otherwise + """ + if not self.table_exists(table): + logger.warning(f"Table {table} does not exist. Skipping delete operation.") + return False + + if not ids: + return True # Nothing to delete + + try: + with self._connection.cursor() as cursor: + # Create placeholders for the IDs + placeholders = ",".join(["%s"] * len(ids)) + sql = f"DELETE FROM `{table}` WHERE id IN ({placeholders})" + cursor.execute(sql, ids) + self._connection.commit() + logger.info(f"Deleted {cursor.rowcount} documents from table {table}") + return True + except Exception as e: + logger.error(f"Failed to delete documents from table {table}: {e}") + return False + + def list_docs(self, table: str, offset: int = 0, limit: int = 100) -> list[dict]: + """List documents from a MySQL table. + + Args: + table: The table name to list documents from + offset: Offset for pagination + limit: Limit for pagination + + Returns: + list[dict]: List of documents with id and content + """ + if not self.table_exists(table): + logger.warning(f"Table {table} does not exist. Returning empty list.") + return [] + + try: + with self._connection.cursor() as cursor: + sql = f"SELECT id, data FROM `{table}` ORDER BY created_at DESC LIMIT %s OFFSET %s" + cursor.execute(sql, (limit, offset)) + results = cursor.fetchall() + return [ + {"id": str(row["id"]), "content": row["data"]} for row in results + ] + except Exception as e: + logger.error(f"Failed to list documents from table {table}: {e}") + return [] + def is_empty(self): pass