Spaces:
Sleeping
Sleeping
# src/create_database.py | |
import os | |
import json | |
from src.memory import MemoryManager # Corrected import path | |
import logging | |
from typing import List, Dict | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def parse_data_update(file_path: str, keyword_dir: str) -> List[Dict[str, str]]: | |
if not os.path.exists(file_path): | |
logging.error(f"File not found: {file_path}") | |
return [] | |
with open(file_path, 'r') as file_obj: | |
content = file_obj.read() | |
content = content.lower() # Normalize to lowercase | |
sections = [] | |
lines = content.split('\n') | |
current_section = None | |
current_content = [] | |
for line in lines: | |
if line.strip().startswith("chronique #") or line.strip().startswith("flash info fl-") or line.strip().startswith("chronique-faq #"): | |
if current_section: | |
sections.append({ | |
"concept": current_section, | |
"description": "\n".join(current_content) | |
}) | |
logging.info(f"Parsed section: {current_section}") | |
current_section = line.strip() | |
current_content = [] | |
else: | |
current_content.append(line) | |
if current_section: | |
sections.append({ | |
"concept": current_section, | |
"description": "\n".join(current_content) | |
}) | |
logging.info(f"Parsed section: {current_section}") | |
return sections | |
def get_keywords(number: str, keyword_dir: str) -> List[str]: | |
keyword_file = os.path.join(keyword_dir, f"FL-{number}-KEYWORD.txt") | |
if not os.path.exists(keyword_file): | |
keyword_file = os.path.join(keyword_dir, f"INFO-{number}-KEYWORD.txt") | |
if not os.path.exists(keyword_file): | |
keyword_file = os.path.join(keyword_dir, f"CHRONIQUE{number}-KEYWORD.txt") | |
if not os.path.exists(keyword_file): | |
logging.warning(f"Keyword file not found: {keyword_file}") | |
return [] | |
with open(keyword_file, 'r') as file_obj: | |
content = file_obj.read() | |
if 'KEYWORD = ' in content: | |
content = content.split('KEYWORD = ')[1] | |
tags = content.split(', ') | |
tags = [tag.strip() for tag in tags if tag.strip()] # Remove empty tags | |
logging.info(f"Keywords for {number}: {tags}") | |
return tags | |
def load_and_process_dataset(data_update_path: str, keyword_dir: str, db_path: str): | |
memory_manager = MemoryManager(db_path) | |
sections = parse_data_update(data_update_path, keyword_dir) | |
for section in sections: | |
concept = section['concept'] | |
description = section['description'] | |
number = concept.split('#')[1].split()[0] # Extract the number from the concept | |
tags = get_keywords(number, keyword_dir) | |
# Check if the section already exists in the database | |
if not memory_manager.section_exists(concept): | |
memory_manager.add_semantic_memory(concept, description, tags=tags) | |
logging.info(f"Added section: {concept}") | |
else: | |
logging.info(f"Section already exists: {concept}") | |
if __name__ == "__main__": | |
data_update_path = "data-update.txt" | |
keyword_dir = "keyword" # Updated keyword directory | |
db_path = "agent.db" | |
load_and_process_dataset(data_update_path, keyword_dir, db_path) | |