diff --git a/veadk/database/database_adapter.py b/veadk/database/database_adapter.py index 28554787..0c0a5f19 100644 --- a/veadk/database/database_adapter.py +++ b/veadk/database/database_adapter.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import re import time from typing import BinaryIO, TextIO diff --git a/veadk/database/vector/opensearch_vector_database.py b/veadk/database/vector/opensearch_vector_database.py index e6a7763f..9cdf7290 100644 --- a/veadk/database/vector/opensearch_vector_database.py +++ b/veadk/database/vector/opensearch_vector_database.py @@ -186,7 +186,7 @@ def query(self, query: str, **kwargs: Any) -> list[str]: assert collection_name is not None, "Collection name is required." if not self._opensearch_client.indices.exists(index=collection_name): logger.warning( - f"querying {query}, but collection {collection_name} does not exist. retun a empty list." + f"querying {query}, but collection {collection_name} does not exist. return a empty list." ) return [] query_vector = self._embedding_client.embed_query(query) @@ -196,10 +196,58 @@ def query(self, query: str, **kwargs: Any) -> list[str]: @override def delete(self, collection_name: str, **kwargs: Any): + """drop index""" if not self._opensearch_client.indices.exists(index=collection_name): raise ValueError(f"Collection {collection_name} does not exist.") self._opensearch_client.indices.delete(index=collection_name) - def is_empty(self, collection_name: str): + def is_empty(self, collection_name: str) -> bool: response = self._opensearch_client.count(index=collection_name) return response["count"] == 0 + + def collection_exists(self, collection_name: str) -> bool: + return self._opensearch_client.indices.exists(index=collection_name) + + def list_all_collection(self) -> list: + """List all index name of OpenSearch.""" + response = self._opensearch_client.indices.get_alias() + return list(response.keys()) + + def get_all_docs(self, collection_name: str, size: int = 10000) -> list[dict]: + """Match all docs in one index of OpenSearch""" + if not self.collection_exists(collection_name): + logger.warning( + f"Get all docs, but collection {collection_name} does not exist. return a empty list." + ) + return [] + + query = {"size": size, "query": {"match_all": {}}} + response = self._opensearch_client.search(index=collection_name, body=query) + return [ + { + "id": hit["_id"], + "page_content": hit["_source"]["page_content"], + } + for hit in response["hits"]["hits"] + ] + + def delete_by_query(self, collection_name: str, query: str): + """Delete docs by query in one index of OpenSearch""" + if not self.collection_exists(collection_name): + raise ValueError(f"Collection {collection_name} does not exist.") + + query = {"query": {"match": {"page_content": query}}} + response = self._opensearch_client.delete_by_query( + index=collection_name, body=query + ) + self._opensearch_client.indices.refresh(index=collection_name) + return response + + def delete_by_id(self, collection_name: str, id: str): + """Delete docs by id in index of OpenSearch""" + if not self.collection_exists(collection_name): + raise ValueError(f"Collection {collection_name} does not exist.") + + response = self._opensearch_client.delete(index=collection_name, id=id) + self._opensearch_client.indices.refresh(index=collection_name) + return response diff --git a/veadk/memory/long_term_memory.py b/veadk/memory/long_term_memory.py index 1a5ae0a9..acf5669c 100644 --- a/veadk/memory/long_term_memory.py +++ b/veadk/memory/long_term_memory.py @@ -81,7 +81,7 @@ def _filter_and_convert_events(self, events: list[Event]) -> list[str]: # convert: to string-format for storage message = event.content.model_dump(exclude_none=True, mode="json") - final_events.append(json.dumps(message)) + final_events.append(json.dumps(message, ensure_ascii=False)) return final_events @override