Chat_QnA_v2 / cosmos_db.py
binh99's picture
update cosmos db
a4b89be
from azure.cosmos import CosmosClient, PartitionKey
from config import ENDPOINT, CREDENTIAL, DATABASE, CONTAINER_COSMOS
from datetime import date, datetime
import json
_client = CosmosClient(
url=ENDPOINT,
credential=CREDENTIAL,
)
database = _client.create_database_if_not_exists(DATABASE)
_container = database.create_container_if_not_exists(
CONTAINER_COSMOS,
partition_key=PartitionKey("/user_id")
)
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
def upsert_item(user_id, file_name, history, chatbot):
response = _container.upsert_item(
body={
"id": file_name,
"user_id": user_id,
"date": json.dumps(datetime.utcnow(), default=json_serial),
"history": history,
"chatbot": chatbot
}
)
message = f'Upsert {file_name} succesfully'
return message
def read_item(user_id, file_name):
response = _container.read_item(item=file_name, partition_key=user_id)
return response
def query_items(user_id, file_name):
response = list(_container.query_items(
query="SELECT * FROM r WHERE r.user_id=@user_id AND r.id=@id",
parameters=[
{"name": "@user_id", "value": user_id}, {"name": "@id", "value": file_name}
],
enable_cross_partition_query=True
))
return response
def query_item(user_id):
response = list(_container.query_items(
query="SELECT * FROM r WHERE r.user_id=@user_id",
parameters=[
{"name": "@user_id", "value": user_id}
],
enable_cross_partition_query=True
))
return response
def delete_items(user_id, file_name):
response = _container.delete_item(item=file_name, partition_key=user_id)
message = f'Delete {file_name} succesfully'
return message
if __name__ == '__main__':
mes = query_item("khanh")
docs = [m["id"] for m in mes]
print(docs)