Spaces:
Sleeping
Sleeping
import streamlit as st | |
import psycopg2 | |
from psycopg2 import extras | |
from datetime import datetime | |
import logging | |
import json | |
import pandas as pd | |
from typing import List, Dict, Tuple | |
import os | |
import sys | |
# Configuration du logging | |
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()]) | |
logger = logging.getLogger(__name__) | |
sys.stdout.reconfigure(encoding='utf-8') | |
# Configuration de la base de données | |
db_config = { | |
"database": st.secrets["DB_NAME"], | |
"user": st.secrets["DB_USER"], | |
"password": st.secrets["DB_PASSWORD"], | |
"host": st.secrets["DB_HOST"], | |
"port": st.secrets["DB_PORT"] | |
} | |
######################### CLASSES ######################### | |
class DBManager: | |
def __init__(self, db_config: Dict, schema_file: str): | |
""" | |
Initialise la connexion à la base PostgreSQL et charge le schéma. | |
:param db_config: Dictionnaire avec les informations de connexion (host, database, user, password). | |
:param schema_file: Chemin vers le fichier JSON contenant le schéma de la base. | |
""" | |
self.db_config = db_config | |
self.schema_file = schema_file | |
self.connection = None | |
self.cursor = None | |
self._load_schema() | |
self._connect_to_database() | |
self._create_database() | |
self.cursor.execute("SET NAMES 'UTF8'") | |
def _load_schema(self): | |
"""Charge le schéma de base de données depuis un fichier JSON.""" | |
if not os.path.exists(self.schema_file): | |
raise FileNotFoundError(f"Fichier non trouvé : {self.schema_file}") | |
with open(self.schema_file, "r", encoding="utf-8") as file: | |
self.schema = json.load(file) | |
def _connect_to_database(self): | |
"""Établit une connexion avec la base PostgreSQL.""" | |
try: | |
self.connection = psycopg2.connect(**self.db_config, cursor_factory=extras.DictCursor) | |
self.cursor = self.connection.cursor() | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def _create_database(self): | |
"""Crée les tables définies dans le schéma JSON.""" | |
for table_name, table_info in self.schema['tables'].items(): | |
create_table_query = self._generate_create_table_query(table_name, table_info['columns']) | |
try: | |
self.cursor.execute(create_table_query) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def _generate_create_table_query(self, table_name: str, columns: List[Dict]) -> str: | |
"""Génère une requête SQL pour créer une table en fonction du schéma.""" | |
column_definitions = [] | |
for column in columns: | |
column_definition = f"{column['name']} {column['type']}" | |
if 'constraints' in column and column['constraints']: | |
column_definition += " " + " ".join(column['constraints']) | |
column_definitions.append(column_definition) | |
columns_str = ", ".join(column_definitions) | |
return f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});" | |
def insert_data_from_dict(self, table_name: str, data: List[Dict], id_column: str) -> List[int]: | |
"""Insère des données dans une table à partir d'une liste de dictionnaires et retourne les IDs insérés. | |
table_name : str : nom de la table | |
data : List[Dict] : données à insérer | |
id_column : str : nom de la colonne d'ID à retourner | |
""" | |
columns = ", ".join(data[0].keys()) | |
placeholders = ", ".join(['%s' for _ in data[0].keys()]) | |
# Requête pour insérer les données et retourner l'ID dynamique | |
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) RETURNING {id_column}" | |
ids = [] | |
try: | |
for row in data: | |
self.cursor.execute(query, tuple(row.values())) | |
inserted_id = self.cursor.fetchone()[0] | |
ids.append(inserted_id) | |
return ids | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def insert_data_from_dict(self, table_name: str, data: List[Dict], id_column: str) -> List[int]: | |
"""Insère des données dans une table à partir d'une liste de dictionnaires et retourne les IDs insérés. | |
table_name : str : nom de la table | |
data : List[Dict] : données à insérer | |
id_column : str : nom de la colonne d'ID à retourner | |
""" | |
columns = ", ".join(data[0].keys()) | |
placeholders = ", ".join(['%s' for _ in data[0].keys()]) | |
# Requête pour insérer les données et retourner l'ID dynamique | |
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) RETURNING {id_column}" | |
ids = [] # Liste pour stocker les IDs retournés | |
for row in data: | |
self.cursor.execute(query, tuple(row.values())) | |
inserted_id = self.cursor.fetchone()[0] # Récupère le premier (et unique) élément de la ligne retournée | |
ids.append(inserted_id) | |
self.connection.commit() | |
return ids | |
def insert_data_from_csv(self, table_name: str, csv_file: str) -> None: | |
"""Insère des données dans une table à partir d'un fichier CSV.""" | |
df = pd.read_csv(csv_file) | |
columns = df.columns.tolist() | |
placeholders = ", ".join(['%s' for _ in columns]) | |
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" | |
try: | |
for row in df.itertuples(index=False, name=None): | |
self.cursor.execute(query, row) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def fetch_all(self, table_name: str) -> List[Tuple]: | |
"""Récupère toutes les données d'une table.""" | |
try: | |
self.cursor.execute(f"SELECT * FROM {table_name}") | |
return self.cursor.fetchall() | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def execute_safe(self, query: str, params: Tuple = (), fetch: bool = False): | |
""" | |
Exécute une requête SQL avec gestion centralisée des erreurs. | |
:param query: Requête SQL à exécuter. | |
:param params: Paramètres de la requête. | |
:param fetch: Indique si les résultats doivent être récupérés. | |
:return: Résultats de la requête (si fetch est True), sinon None. | |
""" | |
try: | |
self.cursor.execute(query, params) | |
if fetch: | |
return self.cursor.fetchall() # Retourner les résultats si demandé | |
else: | |
self.connection.commit() # Valider les modifications | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
self.connection.rollback() # Annuler la transaction en cas d'erreur | |
raise RuntimeError(f"Erreur SQL : {err} | Query : {query} | Params : {params}") | |
def fetch_by_condition(self, table_name: str, condition: str, params: Tuple = ()) -> List[Tuple]: | |
"""Récupère les données d'une table avec une condition.""" | |
query = f"SELECT * FROM {table_name} WHERE {condition}" | |
try: | |
self.cursor.execute(query, params) | |
return self.execute_safe(query, params, fetch=True) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def update_data(self, table_name: str, set_clause: str, condition: str, params: Tuple) -> None: | |
"""Met à jour des données dans une table.""" | |
query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}" | |
try: | |
self.cursor.execute(query, params) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def delete_data(self, table_name: str, condition: str, params: Tuple) -> None: | |
"""Supprime des données d'une table selon une condition.""" | |
query = f"DELETE FROM {table_name} WHERE {condition}" | |
try: | |
self.cursor.execute(query, params) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def close_connection(self) -> None: | |
"""Ferme la connexion à la base de données.""" | |
if self.connection: | |
self.cursor.close() | |
self.connection.close() | |
def create_index(self, table_name: str, column_name: str) -> None: | |
"""Crée un index sur une colonne spécifique pour améliorer les performances de recherche.""" | |
query = f"CREATE INDEX IF NOT EXISTS idx_{table_name}_{column_name} ON {table_name} ({column_name})" | |
try: | |
self.cursor.execute(query) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
self.connection.commit() | |
def select(self, query: str, params: Tuple = ()) -> List[Tuple]: | |
"""Exécute une requête SELECT personnalisée et retourne les résultats.""" | |
try: | |
self.cursor.execute(query, params) | |
return self.cursor.fetchall() | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def query(self, query, params=None): | |
""" | |
Exécute une requête SQL, en utilisant les paramètres fournis, | |
et retourne les résultats si nécessaire. | |
""" | |
try: | |
self.cursor.execute(query, params) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
finally: | |
# Si la requête est un SELECT, récupérer les résultats | |
if query.strip().upper().startswith("SELECT"): | |
return self.cursor.fetchall() | |
else: # Si ce n'est pas un SELECT, ne rien retourner (utile pour INSERT/UPDATE) | |
self.connection.commit() | |
return None | |
######################### FONCTIONS ######################### | |
# Mettre DBManager en cache | |
def get_db_manager(): | |
return DBManager(db_config, os.path.join("server","db","schema.json")) | |
def save_message(db_manager, id_conversation: int, role: str, content: str,temps_traitement, total_cout, impact_eco) -> None: | |
""" | |
Sauvegarde un message dans la base de données, en associant l'utilisateur à la conversation. | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation associée. | |
:param role: Rôle de l'intervenant (ex. 'user' ou 'assistant'). | |
:param content: Contenu du message. | |
""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data = [{ | |
"id_conversation": id_conversation, | |
"role": role, | |
"content": content, | |
"timestamp": timestamp, | |
"temps_traitement":temps_traitement, | |
"total_cout": total_cout, | |
"impact_eco": impact_eco | |
}] | |
try: | |
db_manager.insert_data_from_dict("messages", data, id_column="id_message") | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion: {err}") | |
return | |
def create_conversation(db_manager, title: str, id_utilisateur: int) -> int: | |
""" | |
Crée une nouvelle conversation dans la base de données, en associant l'utilisateur à la conversation. | |
:param db_manager: Instance de DBManager. | |
:param title: Titre de la conversation. | |
:param id_utilisateur: ID de l'utilisateur associé. | |
:return: ID de la conversation nouvellement créée. | |
""" | |
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data = [{ | |
"created_at": created_at, | |
"title": title, | |
"id_utilisateur": id_utilisateur, | |
}] | |
try: | |
result = db_manager.insert_data_from_dict("conversations", data, id_column="id_conversation") | |
return result[0] | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def load_messages(db_manager, id_conversation: int) -> List[Dict]: | |
""" | |
Charge les messages associés à une conversation. | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation. | |
:return: Liste des messages sous forme de dictionnaires. | |
""" | |
query = """ | |
SELECT * | |
FROM messages | |
WHERE id_conversation = %s | |
ORDER BY timestamp ASC | |
""" | |
try: | |
result = db_manager.query(query, (id_conversation,)) | |
return [{"role": row["role"], "content": row["content"], "timestamp":row["timestamp"], "temps_traitement":row["temps_traitement"]} for row in result] | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def load_conversations(db_manager, id_utilisateur: int) -> List[Dict]: | |
""" | |
Charge toutes les conversations enregistrées pour un utilisateur donné. | |
:param db_manager: Instance de DBManager. | |
:param id_utilisateur: ID de l'utilisateur. | |
:return: Liste des conversations. | |
""" | |
query = """ | |
SELECT * FROM conversations | |
WHERE id_utilisateur = %s | |
ORDER BY created_at DESC | |
""" | |
try: | |
result = db_manager.query(query, (id_utilisateur,)) | |
return [ | |
{"id_conversation": row["id_conversation"], "created_at": row["created_at"], "title": row["title"]} for row in result | |
] | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def update_conversation(db_manager, id_conversation: int, id_utilisateur: int) -> None: | |
""" | |
Met à jour le champ `created_at` d'une conversation donnée pour un utilisateur. | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation à mettre à jour. | |
:param id_utilisateur: ID de l'utilisateur. | |
""" | |
new_timer = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
query = """ | |
UPDATE conversations | |
SET created_at = %s | |
WHERE id_conversation = %s AND id_utilisateur = %s | |
""" | |
try: | |
db_manager.query(query, (new_timer, id_conversation, id_utilisateur)) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def update_conversation_title(db_manager, id_conversation: int, new_title: str) -> None: | |
""" | |
Met à jour le titre d'une conversation si celui-ci est encore "Nouvelle conversation". | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation à mettre à jour. | |
:param new_title: Nouveau titre de la conversation. | |
""" | |
query = """ | |
UPDATE conversations | |
SET title = %s | |
WHERE id_conversation = %s AND title = 'Nouvelle conversation' | |
""" | |
try: | |
db_manager.query(query, (new_title, id_conversation)) | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def get_conversation_title(db_manager, id_conversation: int) -> str: | |
""" | |
Récupère le titre d'une conversation spécifique en utilisant `fetch_by_condition`. | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation à interroger. | |
:return: Le titre de la conversation ou "Nouvelle conversation". | |
""" | |
table_name = "conversations" | |
condition = "id_conversation = %s" | |
try: | |
results = db_manager.fetch_by_condition(table_name, condition, (id_conversation,)) | |
if results: | |
# Suppose que `title` est la troisième colonne | |
return results[0][2] | |
return "Nouvelle conversation" | |
except psycopg2.Error as err: | |
logger.error(f"Erreur de connexion : {err}") | |
return | |
def delete_conversation(db_manager, id_conversation: int) -> None: | |
""" | |
Supprime une conversation et ses messages associés de la base de données. | |
:param db_manager: Instance de DBManager. | |
:param id_conversation: ID de la conversation à supprimer. | |
""" | |
try: | |
# Supprimer les messages liés à la conversation | |
query_delete_messages = "DELETE FROM messages WHERE id_conversation = %s" | |
db_manager.query(query_delete_messages, (id_conversation,)) | |
# Supprimer la conversation elle-même | |
query_delete_conversation = "DELETE FROM conversations WHERE id_conversation = %s" | |
db_manager.query(query_delete_conversation, (id_conversation,)) | |
print(f"✅ Conversation {id_conversation} supprimée avec succès.") | |
except Exception as e: | |
print(f"❌ Erreur lors de la suppression de la conversation {id_conversation}: {e}") | |
def load_chatbot_suggestions(db_manager, user_id): | |
""" | |
Charge les suggestions du chatbot enregistrées pour un utilisateur donné. | |
""" | |
query = "SELECT repas_suggestion FROM suggestions_repas WHERE id_utilisateur = %s" | |
try: | |
db_manager.cursor.execute(query, (user_id,)) | |
suggestions = [row[0] for row in db_manager.cursor.fetchall()] | |
return suggestions | |
except psycopg2.Error as err: | |
logger.error(f"Erreur lors du chargement des suggestions : {err}") | |
return [] | |
def save_chatbot_suggestions(db_manager, user_id, suggestions): | |
""" | |
Sauvegarde les suggestions du chatbot dans la base de données. | |
""" | |
query = """ | |
INSERT INTO suggestions_repas (id_utilisateur, repas_suggestion, motif_suggestion) | |
VALUES (%s, %s, %s) | |
""" | |
try: | |
for suggestion in suggestions: | |
db_manager.cursor.execute(query, (user_id, suggestion, "Chatbot")) | |
db_manager.connection.commit() | |
except psycopg2.Error as err: | |
logger.error(f"Erreur lors de l'enregistrement des suggestions : {err}") | |