Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions veadk/database/database_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,33 @@ def query(self, query: str, index: str, top_k: int = 0) -> list:
logger.error(f"Failed to search from Redis: index={index} error={e}")
raise e

def delete_doc(self, index: str, id: str) -> bool:
logger.debug(f"Deleting document from Redis database: index={index} id={id}")
try:
# For Redis, we need to handle deletion differently since RedisDatabase.delete_doc
# takes a key and a single id
result = self.client.delete_doc(key=index, id=id)
return result
except Exception as e:
logger.error(
f"Failed to delete document from Redis database: index={index} id={id} error={e}"
)
return False

def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]:
logger.debug(f"Listing documents from Redis database: index={index}")
try:
# Get all documents from Redis
docs = self.client.list_docs(key=index)

# Apply offset and limit for pagination
return docs[offset : offset + limit]
except Exception as e:
logger.error(
f"Failed to list documents from Redis database: index={index} error={e}"
)
return []


class RelationalDatabaseAdapter:
def __init__(self, client):
Expand Down Expand Up @@ -108,6 +135,28 @@ def query(self, query: str, index: str, top_k: int) -> list[str]:

return [item["data"] for item in results]

def delete_doc(self, index: str, id: str) -> bool:
logger.debug(f"Deleting document from SQL database: table_name={index} id={id}")
try:
# Convert single id to list for the client method
result = self.client.delete_doc(table=index, ids=[int(id)])
return result
except Exception as e:
logger.error(
f"Failed to delete document from SQL database: table_name={index} id={id} error={e}"
)
return False

def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]:
logger.debug(f"Listing documents from SQL database: table_name={index}")
try:
return self.client.list_docs(table=index, offset=offset, limit=limit)
except Exception as e:
logger.error(
f"Failed to list documents from SQL database: table_name={index} error={e}"
)
return []


class VectorDatabaseAdapter:
def __init__(self, client):
Expand Down Expand Up @@ -152,6 +201,23 @@ def query(self, query: str, index: str, top_k: int) -> list[str]:
top_k=top_k,
)

def delete_doc(self, index: str, id: str) -> bool:
self._validate_index(index)
logger.debug(f"Deleting documents from vector database: index={index} id={id}")
try:
self.client.delete_by_id(collection_name=index, id=id)
return True
except Exception as e:
logger.error(
f"Failed to delete document from vector database: index={index} id={id} error={e}"
)
return False

def list_docs(self, index: str, offset: int = 0, limit: int = 1000) -> list[dict]:
self._validate_index(index)
logger.debug(f"Listing documents from vector database: index={index}")
return self.client.list_docs(collection_name=index, offset=offset, limit=limit)


class VikingDatabaseAdapter:
def __init__(self, client):
Expand Down Expand Up @@ -212,6 +278,16 @@ def query(self, query: str, index: str, top_k: int) -> list[str]:

return self.client.query(query, collection_name=index, top_k=top_k)

def delete_doc(self, index: str, id: str) -> bool:
self._validate_index(index)
logger.debug(f"Deleting documents from vector database: index={index} id={id}")
return self.client.delete_by_id(collection_name=index, id=id)

def list_docs(self, index: str, offset: int, limit: int) -> list[dict]:
self._validate_index(index)
logger.debug(f"Listing documents from vector database: index={index}")
return self.client.list_docs(collection_name=index, offset=offset, limit=limit)


class VikingMemoryDatabaseAdapter:
def __init__(self, client):
Expand Down Expand Up @@ -248,6 +324,12 @@ def query(self, query: str, index: str, top_k: int, **kwargs):
result = self.client.query(query, collection_name=index, top_k=top_k, **kwargs)
return result

def delete_docs(self, index: str, ids: list[int]):
raise NotImplementedError("VikingMemoryDatabase does not support delete_docs")

def list_docs(self, index: str):
raise NotImplementedError("VikingMemoryDatabase does not support list_docs")


class LocalDatabaseAdapter:
def __init__(self, client):
Expand All @@ -261,6 +343,12 @@ def add(self, data: list[str], **kwargs):
def query(self, query: str, **kwargs):
return self.client.query(query, **kwargs)

def delete_doc(self, index: str, id: str) -> bool:
return self.client.delete_doc(id)

def list_docs(self, index: str, offset: int = 0, limit: int = 100) -> list[dict]:
return self.client.list_docs(offset=offset, limit=limit)


MAPPING = {
"RedisDatabase": KVDatabaseAdapter,
Expand Down
44 changes: 44 additions & 0 deletions veadk/database/kv/redis_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,47 @@ def delete(self, **kwargs):
except Exception as e:
logger.error(f"Failed to delete key `{key}`: {e}")
raise e

def delete_doc(self, key: str, id: str) -> bool:
"""Delete a specific document by ID from a Redis list.

Args:
key: The Redis key (list) to delete from
id: The ID of the document to delete

Returns:
bool: True if deletion was successful, False otherwise
"""
try:
# Get all items in the list
items = self._client.lrange(key, 0, -1)

# Find the index of the item to delete
for i, item in enumerate(items):
# Assuming the item is stored as a JSON string with an 'id' field
# If it's just the content, we'll use the list index as ID
if str(i) == id:
self._client.lrem(key, 1, item)
return True

logger.warning(f"Document with id {id} not found in key {key}")
return False
except Exception as e:
logger.error(f"Failed to delete document with id {id} from key {key}: {e}")
return False

def list_docs(self, key: str) -> list[dict]:
"""List all documents in a Redis list.

Args:
key: The Redis key (list) to list documents from

Returns:
list[dict]: List of documents with id and content
"""
try:
items = self._client.lrange(key, 0, -1)
return [{"id": str(i), "content": item} for i, item in enumerate(items)]
except Exception as e:
logger.error(f"Failed to list documents from key {key}: {e}")
return []
23 changes: 19 additions & 4 deletions veadk/database/local_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,35 @@ class LocalDataBase(BaseDatabase):

def __init__(self, **kwargs):
super().__init__()
self.data = []
self.data = {}
self._type = "local"
self._next_id = 0 # Used to generate unique IDs

def add_texts(self, texts: list[str], **kwargs):
self.data.extend(texts)
for text in texts:
self.data[str(self._next_id)] = text
self._next_id += 1

def is_empty(self):
return len(self.data) == 0

def query(self, query: str, **kwargs: Any) -> list[str]:
return self.data
return list(self.data.values())

def delete(self, **kwargs: Any):
self.data = []
self.data = {}

def add(self, texts: list[str], **kwargs: Any):
return self.add_texts(texts)

def list_docs(self, **kwargs: Any) -> list[dict]:
return [{"id": id, "content": content} for id, content in self.data.items()]

def delete_doc(self, id: str, **kwargs: Any):
if id not in self.data:
raise ValueError(f"id {id} not found")
try:
del self.data[id]
return True
except Exception:
return False
57 changes: 57 additions & 0 deletions veadk/database/relational/mysql_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,62 @@ def delete(self, **kwargs):
logger.error(f"Failed to drop table {table}: {e}")
raise e

def delete_doc(self, table: str, ids: list[int]) -> bool:
"""Delete documents by IDs from a MySQL table.

Args:
table: The table name to delete from
ids: List of document IDs to delete

Returns:
bool: True if deletion was successful, False otherwise
"""
if not self.table_exists(table):
logger.warning(f"Table {table} does not exist. Skipping delete operation.")
return False

if not ids:
return True # Nothing to delete

try:
with self._connection.cursor() as cursor:
# Create placeholders for the IDs
placeholders = ",".join(["%s"] * len(ids))
sql = f"DELETE FROM `{table}` WHERE id IN ({placeholders})"
cursor.execute(sql, ids)
self._connection.commit()
logger.info(f"Deleted {cursor.rowcount} documents from table {table}")
return True
except Exception as e:
logger.error(f"Failed to delete documents from table {table}: {e}")
return False

def list_docs(self, table: str, offset: int = 0, limit: int = 100) -> list[dict]:
"""List documents from a MySQL table.

Args:
table: The table name to list documents from
offset: Offset for pagination
limit: Limit for pagination

Returns:
list[dict]: List of documents with id and content
"""
if not self.table_exists(table):
logger.warning(f"Table {table} does not exist. Returning empty list.")
return []

try:
with self._connection.cursor() as cursor:
sql = f"SELECT id, data FROM `{table}` ORDER BY created_at DESC LIMIT %s OFFSET %s"
cursor.execute(sql, (limit, offset))
results = cursor.fetchall()
return [
{"id": str(row["id"]), "content": row["data"]} for row in results
]
except Exception as e:
logger.error(f"Failed to list documents from table {table}: {e}")
return []

def is_empty(self):
pass
8 changes: 5 additions & 3 deletions veadk/database/vector/opensearch_vector_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,22 @@ def list_all_collection(self) -> list:
response = self._opensearch_client.indices.get_alias()
return list(response.keys())

def get_all_docs(self, collection_name: str, size: int = 10000) -> list[dict]:
def list_docs(
self, collection_name: str, offset: int = 0, limit: int = 10000
) -> list[dict]:
"""Match all docs in one index of OpenSearch"""
if not self.collection_exists(collection_name):
logger.warning(
f"Get all docs, but collection {collection_name} does not exist. return a empty list."
)
return []

query = {"size": size, "query": {"match_all": {}}}
query = {"size": limit, "from": offset, "query": {"match_all": {}}}
response = self._opensearch_client.search(index=collection_name, body=query)
return [
{
"id": hit["_id"],
"page_content": hit["_source"]["page_content"],
"content": hit["_source"]["page_content"],
}
for hit in response["hits"]["hits"]
]
Expand Down
65 changes: 65 additions & 0 deletions veadk/database/viking/viking_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
doc_add_path = "/api/knowledge/doc/add"
doc_info_path = "/api/knowledge/doc/info"
doc_del_path = "/api/collection/drop"
list_docs_path = "/api/knowledge/point/list"
delete_docs_path = "/api/knowledge/point/delete"


class VolcengineTOSConfig(BaseModel):
Expand Down Expand Up @@ -400,3 +402,66 @@ def collection_exists(self, collection_name: str) -> bool:
return True
else:
return False

def list_docs(
self, collection_name: str, offset: int = 0, limit: int = -1
) -> list[dict]:
request_params = {
"collection_name": collection_name,
"project": self.config.project,
"offset": offset,
"limit": limit,
}

create_collection_req = prepare_request(
method="POST",
path=list_docs_path,
config=self.config,
data=request_params,
)
resp = requests.request(
method=create_collection_req.method,
url="https://{}{}".format(
g_knowledge_base_domain, create_collection_req.path
),
headers=create_collection_req.headers,
data=create_collection_req.body,
)

result = resp.json()
if result["code"] != 0:
logger.error(f"Error in list_docs: {result['message']}")
raise ValueError(f"Error in list_docs: {result['message']}")

data = [
{"id": res["point_id"], "content": res["content"]}
for res in result["data"]["point_list"]
]
return data

def delete_by_id(self, collection_name: str, id: str) -> bool:
request_params = {
"collection_name": collection_name,
"project": self.config.project,
"point_id": id,
}

create_collection_req = prepare_request(
method="POST",
path=delete_docs_path,
config=self.config,
data=request_params,
)
resp = requests.request(
method=create_collection_req.method,
url="https://{}{}".format(
g_knowledge_base_domain, create_collection_req.path
),
headers=create_collection_req.headers,
data=create_collection_req.body,
)

result = resp.json()
if result["code"] != 0:
return False
return True
8 changes: 8 additions & 0 deletions veadk/knowledgebase/knowledgebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,11 @@ def search(self, query: str, app_name: str, top_k: int | None = None) -> list[st
if len(result) == 0:
logger.warning(f"No documents found in knowledgebase. Query: {query}")
return result

def delete_doc(self, app_name: str, id: str) -> bool:
index = build_knowledgebase_index(app_name)
return self.adapter.delete_doc(index=index, id=id)

def list_docs(self, app_name: str, offset: int = 0, limit: int = 100) -> list[dict]:
index = build_knowledgebase_index(app_name)
return self.adapter.list_docs(index=index, offset=offset, limit=limit)