File size: 7,850 Bytes
500516e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# src/memory.py
import sqlite3
from datetime import datetime, timedelta
import json
from typing import List, Dict, Any, Tuple
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class MemoryManager:
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        self.create_tables()
        self.vectorizer = TfidfVectorizer(stop_words='english')
        logging.info("MemoryManager initialized and tables created.")

    def create_tables(self):
        # Create tables if they don't exist
        self.cursor.execute('''CREATE TABLE IF NOT EXISTS semantic_memory
                           (id INTEGER PRIMARY KEY, concept TEXT, description TEXT, last_accessed DATETIME, tags TEXT, importance REAL DEFAULT 0.5)''')

        # Add tags and importance columns if they don't exist
        self.cursor.execute("PRAGMA table_info(semantic_memory)")
        columns = [column[1] for column in self.cursor.fetchall()]
        if 'tags' not in columns:
            self.cursor.execute("ALTER TABLE semantic_memory ADD COLUMN tags TEXT")
        if 'importance' not in columns:
            self.cursor.execute("ALTER TABLE semantic_memory ADD COLUMN importance REAL DEFAULT 0.5")

        self.cursor.execute('''CREATE INDEX IF NOT EXISTS idx_semantic_concept ON semantic_memory (concept)''')
        self.cursor.execute('''CREATE INDEX IF NOT EXISTS idx_semantic_last_accessed ON semantic_memory (last_accessed)''')
        self.cursor.execute('''CREATE INDEX IF NOT EXISTS idx_semantic_tags ON semantic_memory (tags)''')

        # Create table for user interactions
        self.cursor.execute('''CREATE TABLE IF NOT EXISTS user_interactions
                           (user_id TEXT, query TEXT, response TEXT, timestamp DATETIME)''')

        self.cursor.execute('''CREATE INDEX IF NOT EXISTS idx_user_interactions_timestamp ON user_interactions (timestamp)''')
        self.conn.commit()
        logging.info("Tables and indexes created successfully.")

    def add_semantic_memory(self, concept: str, description: str, tags: List[str] = None):
        if tags is None:
            tags = []
        tags_str = json.dumps(tags)
        self.cursor.execute("INSERT INTO semantic_memory (concept, description, last_accessed, tags) VALUES (?, ?, ?, ?)",
                            (concept, description, datetime.now().isoformat(), tags_str))
        self.conn.commit()
        logging.info("Semantic memory added.")

    def retrieve_relevant_memories(self, query: str, limit: int = 30) -> List[Dict[str, Any]]:
        all_memories = self._get_all_memories()

        # Handle empty or stop-word-only query
        if not query.strip() or self.vectorizer.stop_words and all(word in self.vectorizer.stop_words for word in query.split()):
            return []

        scored_memories = self._score_memories(query, all_memories)
        return [memory for memory, score in sorted(scored_memories, key=lambda x: x[1], reverse=True)[:limit]]

    def _get_all_memories(self) -> List[Tuple[Dict[str, Any], datetime]]:
        self.cursor.execute("SELECT concept, description, importance, last_accessed, tags FROM semantic_memory ORDER BY importance DESC, last_accessed DESC")
        semantic_memories = self.cursor.fetchall()

        all_memories = [({"concept": concept, "description": description, "importance": importance},
                         datetime.fromisoformat(last_accessed), json.loads(tags) if tags else None) for concept, description, importance, last_accessed, tags in semantic_memories]

        return all_memories

    def _score_memories(self, query: str, memories: List[Tuple[Dict[str, Any], datetime, List[str]]]) -> List[Tuple[Dict[str, Any], float]]:
        query_vector = self.vectorizer.fit_transform([query])

        scored_memories = []
        for memory, timestamp, tags in memories:
            text = f"{memory['concept']} {memory['description']}"
            importance = memory.get('importance', 0.5)

            memory_vector = self.vectorizer.transform([text])
            similarity = cosine_similarity(query_vector, memory_vector)[0][0]

            if timestamp:
                recency = 1 / (1 + (datetime.now() - timestamp).total_seconds() / 60)  # Favor recent memories
            else:
                recency = 0.5  # Neutral recency for semantic memories

            score = (similarity + importance + recency) / 3
            scored_memories.append((memory, score))

        return scored_memories

    def section_exists(self, concept: str) -> bool:
        # Normalize the concept to lowercase
        concept = concept.lower()
        self.cursor.execute("SELECT COUNT(*) FROM semantic_memory WHERE concept LIKE ?", (f"{concept}%",))
        count = self.cursor.fetchone()[0]
        return count > 0

    def add_user_interaction(self, user_id: str, query: str, response: str):
        self.cursor.execute("INSERT INTO user_interactions (user_id, query, response, timestamp) VALUES (?, ?, ?, ?)",
                            (user_id, query, response, datetime.now().isoformat()))
        self.conn.commit()
        logging.info(f"User interaction added: User ID: {user_id}, Query: {query}, Response: {response}")

    def get_user_interactions(self, user_id: str) -> List[Dict[str, Any]]:
        self.cursor.execute("SELECT query, response, timestamp FROM user_interactions WHERE user_id = ?", (user_id,))
        interactions = self.cursor.fetchall()
        return [{"query": query, "response": response, "timestamp": timestamp} for query, response, timestamp in interactions]

    def cleanup_expired_interactions(self):
        cutoff_time = datetime.now() - timedelta(minutes=5)
        self.cursor.execute("DELETE FROM user_interactions WHERE timestamp < ?", (cutoff_time.isoformat(),))
        self.conn.commit()
        logging.info(f"Expired user interactions cleaned up. Cutoff time: {cutoff_time}")

    def get_section_description(self, section_name: str) -> str:
        # Normalize the section name to lowercase
        section_name = section_name.lower()

        # Retrieve the specific section from the database
        self.cursor.execute("SELECT description FROM semantic_memory WHERE concept LIKE ?", (f"{section_name}%",))
        result = self.cursor.fetchone()
        if result:
            logging.info(f"Found section: {section_name}")
            return result[0]
        else:
            logging.warning(f"Section not found: {section_name}")
            return ""

    def count_chroniques(self) -> int:
        # Count the number of chroniques in the database
        self.cursor.execute("SELECT COUNT(*) FROM semantic_memory WHERE concept LIKE 'chronique #%'")
        count = self.cursor.fetchone()[0]
        logging.info(f"Number of chroniques: {count}")
        return count

    def count_flash_infos(self) -> int:
        # Count the number of flash infos in the database
        self.cursor.execute("SELECT COUNT(*) FROM semantic_memory WHERE concept LIKE 'flash info fl-%'")
        count = self.cursor.fetchone()[0]
        logging.info(f"Number of flash infos: {count}")
        return count

    def count_chronique_faqs(self) -> int:
        # Count the number of chronique-faqs in the database
        self.cursor.execute("SELECT COUNT(*) FROM semantic_memory WHERE concept LIKE 'chronique-faq #%'")
        count = self.cursor.fetchone()[0]
        logging.info(f"Number of chronique-faqs: {count}")
        return count

if __name__ == "__main__":
    db_path = "agent.db"
    memory_manager = MemoryManager(db_path)
    memory_manager.cleanup_expired_interactions()