File size: 3,352 Bytes
500516e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# src/create_database.py
import os
import json
from src.memory import MemoryManager  # Corrected import path
import logging
from typing import List, Dict

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def parse_data_update(file_path: str, keyword_dir: str) -> List[Dict[str, str]]:
    if not os.path.exists(file_path):
        logging.error(f"File not found: {file_path}")
        return []

    with open(file_path, 'r') as file_obj:
        content = file_obj.read()

    content = content.lower()  # Normalize to lowercase

    sections = []
    lines = content.split('\n')
    current_section = None
    current_content = []

    for line in lines:
        if line.strip().startswith("chronique #") or line.strip().startswith("flash info fl-") or line.strip().startswith("chronique-faq #"):
            if current_section:
                sections.append({
                    "concept": current_section,
                    "description": "\n".join(current_content)
                })
                logging.info(f"Parsed section: {current_section}")
            current_section = line.strip()
            current_content = []
        else:
            current_content.append(line)

    if current_section:
        sections.append({
            "concept": current_section,
            "description": "\n".join(current_content)
        })
        logging.info(f"Parsed section: {current_section}")

    return sections

def get_keywords(number: str, keyword_dir: str) -> List[str]:
    keyword_file = os.path.join(keyword_dir, f"FL-{number}-KEYWORD.txt")
    if not os.path.exists(keyword_file):
        keyword_file = os.path.join(keyword_dir, f"INFO-{number}-KEYWORD.txt")
    if not os.path.exists(keyword_file):
        keyword_file = os.path.join(keyword_dir, f"CHRONIQUE{number}-KEYWORD.txt")
    if not os.path.exists(keyword_file):
        logging.warning(f"Keyword file not found: {keyword_file}")
        return []

    with open(keyword_file, 'r') as file_obj:
        content = file_obj.read()
        if 'KEYWORD = ' in content:
            content = content.split('KEYWORD = ')[1]
        tags = content.split(', ')
        tags = [tag.strip() for tag in tags if tag.strip()]  # Remove empty tags
        logging.info(f"Keywords for {number}: {tags}")
        return tags

def load_and_process_dataset(data_update_path: str, keyword_dir: str, db_path: str):
    memory_manager = MemoryManager(db_path)

    sections = parse_data_update(data_update_path, keyword_dir)
    for section in sections:
        concept = section['concept']
        description = section['description']
        number = concept.split('#')[1].split()[0]  # Extract the number from the concept
        tags = get_keywords(number, keyword_dir)

        # Check if the section already exists in the database
        if not memory_manager.section_exists(concept):
            memory_manager.add_semantic_memory(concept, description, tags=tags)
            logging.info(f"Added section: {concept}")
        else:
            logging.info(f"Section already exists: {concept}")

if __name__ == "__main__":
    data_update_path = "data-update.txt"
    keyword_dir = "keyword"  # Updated keyword directory
    db_path = "agent.db"
    load_and_process_dataset(data_update_path, keyword_dir, db_path)