# Code-Dokumentation ## Einführung Dieses Dokument beschreibt den Code zur Verarbeitung und Simulation von Daten aus einer CSV-Datei, die in einem neuronalen Netzwerk verwendet werden. Der Code umfasst Funktionen zur Initialisierung des Netzwerks, zur Verarbeitung der CSV-Datei, zur Simulation des Lernprozesses und zur Speicherung und Laden des Modells. ## Abhängigkeiten - `pandas`: Zur Verarbeitung von CSV-Dateien. - `numpy`: Für numerische Operationen. - `random`: Für zufällige Operationen. - `tqdm`: Für Fortschrittsanzeigen. - `tkinter`: Für die grafische Benutzeroberfläche. - `seaborn`: Für die Visualisierung. - `networkx`: Für die Erstellung und Analyse von Graphen. - `json`: Für die Speicherung und das Laden von Modellen. - `os`: Für Dateioperationen. - `time`: Für Zeitmessungen. - `torch`: Für neuronale Netzwerke. - `threading`: Für die Verwaltung von Threads. - `logging`: Für die Protokollierung. - `sqlite3`: Für die Verwendung von SQLite-Datenbanken. - `dask.dataframe`: Für die parallele Verarbeitung von Daten. ## Konfiguration des Loggers ```python logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') ``` ## Globale Variablen - `initialized`: Überprüft, ob das Netzwerk initialisiert wurde. - `category_nodes`: Liste der Knoten im Netzwerk. - `questions`: Liste der Fragen. - `model_saved`: Schutzvariable, um zu überprüfen, ob das Modell gespeichert wurde. ## Überprüfen, ob der Ordner existiert ```python output_dir = "plots" if not os.path.exists(output_dir): os.makedirs(output_dir) ``` ## Funktionen ### Funktion zum Aufteilen der CSV-Datei ```python def split_csv(filename, chunk_size=1000, output_dir="data"): if not os.path.exists(output_dir): os.makedirs(output_dir) chunk_iter = pd.read_csv(filename, chunksize=chunk_size) for i, chunk in enumerate(chunk_iter): chunk.to_csv(os.path.join(output_dir, f"data_part_{i}.csv"), index=False) logging.info(f"Chunk {i} mit {len(chunk)} Zeilen gespeichert.") ``` ### Verbesserung 1: Verstärkung der Verbindungen bei häufig gestellten Fragen ```python def strengthen_question_connection(category_nodes, question, category): category_node = next((node for node in category_nodes if node.label == category), None) if category_node: for conn in category_node.connections: if conn.target_node.label == question: old_weight = conn.weight conn.weight += 0.1 # Verstärkung der Verbindung conn.weight = np.clip(conn.weight, 0, 1.0) logging.info(f"Verstärkte Verbindung für Frage '{question}' in Kategorie '{category}': {old_weight:.4f} -> {conn.weight:.4f}") ``` ### Verbesserung 2: Erweiterte Hebb'sche Lernregel zur besseren Zuordnung von Fragen ```python def enhanced_hebbian_learning(node, target_node, learning_rate=0.2, decay_factor=0.01): old_weight = None for conn in node.connections: if conn.target_node == target_node: old_weight = conn.weight conn.weight += learning_rate * node.activation * target_node.activation conn.weight = np.clip(conn.weight - decay_factor * conn.weight, 0, 1.0) break if old_weight is not None: logging.info(f"Hebb'sches Lernen angewendet: Gewicht {old_weight:.4f} -> {conn.weight:.4f}") ``` ### Verbesserung 3: Simulation der Frageverarbeitung im Netzwerk ```python def simulate_question_answering(category_nodes, question, questions): category = next((q['category'] for q in questions if q['question'] == question), None) if not category: logging.warning(f"Frage '{question}' nicht gefunden!") return None category_node = next((node for node in category_nodes if node.label == category), None) if category_node: propagate_signal(category_node, input_signal=0.9, emotion_weights={}, emotional_state=1.0) activation = category_node.activation if activation is None or activation <= 0: logging.warning(f"Kategorie '{category}' hat eine ungültige Aktivierung: {activation}") return 0.0 # Rückgabe von 0, falls die Aktivierung fehlschlägt logging.info(f"Verarbeite Frage: '{question}' → Kategorie: '{category}' mit Aktivierung {activation:.4f}") return activation # Entfernte doppelte Logging-Ausgabe else: logging.warning(f"Kategorie '{category}' nicht im Netzwerk gefunden. Die Kategorie wird neu hinzugefügt!") return 0.0 ``` ### Verbesserung 4: Finden der besten passenden Frage zur Benutzeranfrage ```python def find_question_by_keyword(questions, keyword): matching_questions = [q for q in questions if keyword.lower() in q['question'].lower()] return matching_questions if matching_questions else None ``` ### Verbesserung 5: Suche nach der ähnlichsten Frage basierend auf einfachen Ähnlichkeitsmetriken ```python def find_similar_question(questions, query): from difflib import get_close_matches question_texts = [q['question'] for q in questions] closest_matches = get_close_matches(query, question_texts, n=1, cutoff=0.6) if closest_matches: matched_question = next((q for q in questions if q['question'] == closest_matches[0]), None) return matched_question else: return {"question": "Keine passende Frage gefunden", "category": "Unbekannt"} ``` ### Verbesserung 6: Testfunktion zur Überprüfung des Modells ```python def test_model(category_nodes, questions, query): matched_question = find_question_by_keyword(questions, query) if matched_question: logging.info(f"Gefundene Frage: {matched_question[0]['question']} -> Kategorie: {matched_question[0]['category']}") simulate_question_answering(category_nodes, matched_question[0]['question'], questions) else: logging.warning("Keine passende Frage gefunden.") similarity_question = find_similar_question(questions, query) logging.info(f"Ähnlichste Frage: {similarity_question['question']} -> Kategorie: {similarity_question['category']}") ``` ### NetworkX-Funktionen für kausale Graphen ```python def build_causal_graph(category_nodes): G = nx.DiGraph() for node in category_nodes: G.add_node(node.label) for conn in node.connections: G.add_edge(node.label, conn.target_node.label, weight=conn.weight) return G def analyze_causality_multiple(G, num_pairs=3): if len(G.nodes) < 2: logging.warning("Graph enthält nicht genügend Knoten für eine Analyse.") return for _ in range(num_pairs): start_node, target_node = random.sample(G.nodes, 2) logging.info(f"Analysiere kausale Pfade von '{start_node}' nach '{target_node}'") try: paths = list(nx.all_simple_paths(G, source=start_node, target=target_node)) if paths: for path in paths: logging.info(f"Kausaler Pfad: {' -> '.join(path)}") else: logging.info(f"Kein Pfad gefunden von '{start_node}' nach '{target_node}'") except nx.NetworkXNoPath: logging.warning(f"Kein direkter Pfad zwischen '{start_node}' und '{target_node}' gefunden.") def analyze_node_influence(G): influence_scores = nx.pagerank(G, alpha=0.85) sorted_influences = sorted(influence_scores.items(), key=lambda x: x[1], reverse=True) for node, score in sorted_influences: logging.info(f"Knoten: {node}, Einfluss: {score:.4f}") ``` ### Funktion für Interventionen basierend auf Pearl's Do-Operator ```python def do_intervention(node, new_value): logging.info(f"Intervention: Setze {node.label} auf {new_value}") node.activation = new_value for conn in node.connections: conn.target_node.activation += node.activation * conn.weight ``` ### Kontextabhängiges Lernen verstärken ```python def contextual_causal_analysis(node, context_factors, learning_rate=0.1): context_factor = context_factors.get(node.label, 1.0) if node.activation > 0.8 and context_factor > 1.0: logging.info(f"Kausale Beziehung verstärkt für {node.label} aufgrund des Kontextes.") for conn in node.connections: conn.weight += learning_rate * context_factor conn.weight = np.clip(conn.weight, 0, 1.0) logging.info(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight:.4f}") ``` ### PyTorch-Modell für kausale Inferenz ```python class CausalInferenceNN(nn.Module): def __init__(self): super(CausalInferenceNN, self).__init__() self.fc1 = nn.Linear(10, 20) self.fc2 = nn.Linear(20, 1) def forward(self, x): x = torch.relu(self.fc1(x)) return self.fc2(x) ``` ### Debugging-Funktion ```python def debug_connections(category_nodes): start_time = time.time() for node in category_nodes: logging.info(f"Knoten: {node.label}") for conn in node.connections: logging.info(f" Verbindung zu: {conn.target_node.label}, Gewicht: {conn.weight}") end_time = time.time() logging.info(f"debug_connections Ausführungszeit: {end_time - start_time:.4f} Sekunden") ``` ### Hilfsfunktionen ```python def sigmoid(x): return 1 / (1 + np.exp(-x)) def add_activation_noise(activation, noise_level=0.1): noise = np.random.normal(0, noise_level) return np.clip(activation + noise, 0.0, 1.0) def decay_weights(category_nodes, decay_rate=0.002, forgetting_curve=0.95): for node in category_nodes: for conn in node.connections: conn.weight *= (1 - decay_rate) * forgetting_curve def reward_connections(category_nodes, target_category, reward_factor=0.1): for node in category_nodes: if node.label == target_category: for conn in node.connections: conn.weight += reward_factor conn.weight = np.clip(conn.weight, 0, 1.0) def apply_emotion_weight(activation, category_label, emotion_weights, emotional_state=1.0): emotion_factor = emotion_weights.get(category_label, 1.0) * emotional_state return activation * emotion_factor def generate_simulated_answers(data, personality_distributions): simulated_answers = [] for _, row in data.iterrows(): category = row['Kategorie'] mean = personality_distributions.get(category, 0.5) simulated_answer = np.clip(np.random.normal(mean, 0.2), 0.0, 1.0) simulated_answers.append(simulated_answer) return simulated_answers def social_influence(category_nodes, social_network, influence_factor=0.1): for node in category_nodes: for conn in node.connections: social_impact = sum([social_network.get(conn.target_node.label, 0)]) * influence_factor conn.weight += social_impact conn.weight = np.clip(conn.weight, 0, 1.0) def update_emotional_state(emotional_state, emotional_change_rate=0.02): emotional_state += np.random.normal(0, emotional_change_rate) return np.clip(emotional_state, 0.7, 1.5) def apply_contextual_factors(activation, node, context_factors): context_factor = context_factors.get(node.label, 1.0) return activation * context_factor * random.uniform(0.9, 1.1) def long_term_memory(category_nodes, long_term_factor=0.01): for node in category_nodes: for conn in node.connections: conn.weight += long_term_factor * conn.weight conn.weight = np.clip(conn.weight, 0, 1.0) def hebbian_learning(node, learning_rate=0.3, weight_limit=1.0, reg_factor=0.005): for connection in node.connections: old_weight = connection.weight connection.weight += learning_rate * node.activation * connection.target_node.activation connection.weight = np.clip(connection.weight, -weight_limit, weight_limit) connection.weight -= reg_factor * connection.weight node.activation_history.append(node.activation) # Aktivierung speichern connection.target_node.activation_history.append(connection.target_node.activation) logging.info(f"Hebb'sches Lernen: Gewicht von {old_weight:.4f} auf {connection.weight:.4f} erhöht") ``` ### Klassen für Netzwerkstruktur ```python class Connection: def __init__(self, target_node, weight=None): self.target_node = target_node self.weight = weight if weight is not None else random.uniform(0.1, 1.0) class Node: def __init__(self, label): self.label = label self.connections = [] self.activation = 0.0 self.activation_history = [] def add_connection(self, target_node, weight=None): self.connections.append(Connection(target_node, weight)) def save_state(self): return { "label": self.label, "activation": self.activation, "activation_history": self.activation_history, "connections": [{"target": conn.target_node.label, "weight": conn.weight} for conn in self.connections] } @staticmethod def load_state(state, nodes_dict): node = Node(state["label"]) node.activation = state["activation"] node.activation_history = state["activation_history"] for conn_state in state["connections"]: target_node = nodes_dict[conn_state["target"]] connection = Connection(target_node, conn_state["weight"]) node.connections.append(connection) return node class MemoryNode(Node): def __init__(self, label, memory_type="short_term"): super().__init__(label) self.memory_type = memory_type self.retention_time = {"short_term": 5, "mid_term": 20, "long_term": 100}[memory_type] self.time_in_memory = 0 def decay(self, decay_rate, context_factors, emotional_state): context_factor = context_factors.get(self.label, 1.0) emotional_factor = emotional_state for conn in self.connections: if self.memory_type == "short_term": conn.weight *= (1 - decay_rate * 2 * context_factor * emotional_factor) elif self.memory_type == "mid_term": conn.weight *= (1 - decay_rate * context_factor * emotional_factor) elif self.memory_type == "long_term": conn.weight *= (1 - decay_rate * 0.5 * context_factor * emotional_factor) def promote(self, activation_threshold=0.7): if len(self.activation_history) == 0: return if self.memory_type == "short_term" and np.mean(self.activation_history[-5:]) > activation_threshold: self.memory_type = "mid_term" self.retention_time = 20 elif self.memory_type == "mid_term" and np.mean(self.activation_history[-20:]) > activation_threshold: self.memory_type = "long_term" self.retention_time = 100 class CortexCreativus(Node): def __init__(self, label): super().__init__(label) def generate_new_ideas(self, category_nodes): new_ideas = [] for node in category_nodes: if node.activation > 0.5: new_idea = f"New idea based on {node.label} with activation {node.activation}" new_ideas.append(new_idea) return new_ideas class SimulatrixNeuralis(Node): def __init__(self, label): super().__init__(label) def simulate_scenarios(self, category_nodes): scenarios = [] for node in category_nodes: if node.activation > 0.5: scenario = f"Simulated scenario based on {node.label} with activation {node.activation}" scenarios.append(scenario) return scenarios class CortexCriticus(Node): def __init__(self, label): super().__init__(label) def evaluate_ideas(self, ideas): evaluated_ideas = [] for idea in ideas: evaluation_score = random.uniform(0, 1) evaluation = f"Evaluated idea: {idea} - Score: {evaluation_score}" evaluated_ideas.append(evaluation) return evaluated_ideas class LimbusAffectus(Node): def __init__(self, label): super().__init__(label) def apply_emotion_weight(self, ideas, emotional_state): weighted_ideas = [] for idea in ideas: weighted_idea = f"Emotionally weighted idea: {idea} - Weight: {emotional_state}" weighted_ideas.append(weighted_idea) return weighted_ideas class MetaCognitio(Node): def __init__(self, label): super().__init__(label) def optimize_system(self, category_nodes): for node in category_nodes: node.activation *= random.uniform(0.9, 1.1) class CortexSocialis(Node): def __init__(self, label): super().__init__(label) def simulate_social_interactions(self, category_nodes): interactions = [] for node in category_nodes: if node.activation > 0.5: interaction = f"Simulated social interaction based on {node.label} with activation {node.activation}" interactions.append(interaction) return interactions def connect_new_brains_to_network(category_nodes, new_brains): for brain in new_brains: for node in category_nodes: brain.add_connection(node) node.add_connection(brain) ``` ### Netzwerk-Initialisierung ```python def initialize_quiz_network(categories): try: category_nodes = [Node(c) for c in categories] for node in category_nodes: for target_node in category_nodes: if node != target_node: node.add_connection(target_node) logging.debug(f"Verbindung hinzugefügt: {node.label} → {target_node.label}") debug_connections(category_nodes) for node in category_nodes: logging.info(f"Knoten erstellt: {node.label}") for conn in node.connections: logging.info(f" → Verbindung zu {conn.target_node.label} mit Gewicht {conn.weight:.4f}") return category_nodes except Exception as e: logging.error(f"Fehler bei der Netzwerk-Initialisierung: {e}") return [] ``` ### Signalpropagation ```python def propagate_signal(node, input_signal, emotion_weights, emotional_state=1.0, context_factors=None): node.activation = add_activation_noise(sigmoid(input_signal * random.uniform(0.8, 1.2))) node.activation_history.append(node.activation) # Aktivierung speichern node.activation = apply_emotion_weight(node.activation, node.label, emotion_weights, emotional_state) if context_factors: node.activation = apply_contextual_factors(node.activation, node, context_factors) logging.info(f"Signalpropagation für {node.label}: Eingangssignal {input_signal:.4f}") for connection in node.connections: logging.info(f" → Signal an {connection.target_node.label} mit Gewicht {connection.weight:.4f}") connection.target_node.activation += node.activation * connection.weight def propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state): node.activation = add_activation_noise(sigmoid(input_signal)) node.activation_history.append(node.activation) for connection in node.connections: connection.target_node.activation += node.activation * connection.weight for memory_node in memory_nodes: memory_node.time_in_memory += 1 memory_node.promote() ``` ### Simulation mit Anpassungen ```python def simulate_learning(data, category_nodes, personality_distributions, epochs=1, learning_rate=0.8, reward_interval=5, decay_rate=0.002, emotional_state=1.0, context_factors=None): if context_factors is None: context_factors = {} weights_history = {f"{node.label} → {conn.target_node.label}": [] for node in category_nodes for conn in node.connections} activation_history = {node.label: [] for node in category_nodes} question_nodes = [] for idx, row in data.iterrows(): q_node = Node(row['Frage']) question_nodes.append(q_node) category_label = row['Kategorie'].strip() category_node = next((c for c in category_nodes if c.label == category_label), None) if category_node: q_node.add_connection(category_node) logging.debug(f"Verbindung hinzugefügt: {q_node.label} → {category_node.label}") else: logging.warning(f"Warnung: Kategorie '{category_label}' nicht gefunden für Frage '{row['Frage']}'.") emotion_weights = {category: 1.0 for category in data['Kategorie'].unique()} social_network = {category: random.uniform(0.1, 1.0) for category in data['Kategorie'].unique()} for epoch in range(epochs): logging.info(f"\n--- Epoche {epoch + 1} ---") simulated_answers = generate_simulated_answers(data, personality_distributions) for node in category_nodes: node.activation_sum = 0.0 node.activation_count = 0 for node in category_nodes: propagate_signal(node, random.uniform(0.1, 0.9), emotion_weights, emotional_state, context_factors) node.activation_history.append(node.activation) # Aktivierung speichern for idx, q_node in enumerate(question_nodes): for node in category_nodes + question_nodes: node.activation = 0.0 answer = simulated_answers[idx] propagate_signal(q_node, answer, emotion_weights, emotional_state, context_factors) q_node.activation_history.append(q_node.activation) # Aktivierung speichern hebbian_learning(q_node, learning_rate) for node in category_nodes: node.activation_sum += node.activation if node.activation > 0: node.activation_count += 1 for node in category_nodes: for conn in node.connections: weights_history[f"{node.label} → {conn.target_node.label}"].append(conn.weight) logging.debug(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight}") # Kausalitätsverstärkung anwenden contextual_causal_analysis(q_node, context_factors, learning_rate) for node in category_nodes: if node.activation_count > 0: mean_activation = node.activation_sum / node.activation_count activation_history[node.label].append(mean_activation) logging.info(f"Durchschnittliche Aktivierung für Knoten {node.label}: {mean_activation:.4f}") else: activation_history[node.label].append(0.0) logging.info(f"Knoten {node.label} wurde in dieser Epoche nicht aktiviert.") if (epoch + 1) % reward_interval == 0: target_category = random.choice(data['Kategorie'].unique()) reward_connections(category_nodes, target_category=target_category) decay_weights(category_nodes, decay_rate=decay_rate) social_influence(category_nodes, social_network) logging.info("Simulation abgeschlossen. Ergebnisse werden analysiert...") return activation_history, weights_history ``` ### Simulation mit mehrstufigem Gedächtnis ```python def simulate_multilevel_memory(data, category_nodes, personality_distributions, epochs=1): short_term_memory = [MemoryNode(c, "short_term") for c in category_nodes] mid_term_memory = [] long_term_memory = [] memory_nodes = short_term_memory + mid_term_memory + long_term_memory context_factors = {question: random.uniform(0.9, 1.1) for question in data['Frage'].unique()} emotional_state = 1.0 for epoch in range(epochs): logging.info(f"\n--- Epoche {epoch + 1} ---") for node in short_term_memory: input_signal = random.uniform(0.1, 1.0) propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state) for memory_node in memory_nodes: memory_node.decay(decay_rate=0.01, context_factors=context_factors, emotional_state=emotional_state) for memory_node in memory_nodes: memory_node.promote() short_term_memory, mid_term_memory, long_term_memory = update_memory_stages(memory_nodes) logging.info(f"Epoche {epoch + 1}: Kurzzeit {len(short_term_memory)}, Mittelzeit {len(mid_term_memory)}, Langzeit {len(long_term_memory)}") return short_term_memory, mid_term_memory, long_term_memory def update_memory_stages(memory_nodes): short_term_memory = [node for node in memory_nodes if node.memory_type == "short_term"] mid_term_memory = [node for node in memory_nodes if node.memory_type == "mid_term"] long_term_memory = [node for node in memory_nodes if node.memory_type == "long_term"] return short_term_memory, mid_term_memory, long_term_memory ``` ### Plot-Funktionen ```python def plot_activation_history(activation_history, filename="activation_history.png"): if not activation_history: logging.warning("No activation history to plot") return plt.figure(figsize=(12, 8)) for label, activations in activation_history.items(): if len(activations) > 0: plt.plot(range(1, len(activations) + 1), activations, label=label) plt.title("Entwicklung der Aktivierungen während des Lernens") plt.xlabel("Epoche") plt.ylabel("Aktivierung") plt.legend() plt.grid(True) plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") plt.close() logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") def plot_dynamics(activation_history, weights_history, filename="dynamics.png"): if not weights_history: logging.error("weights_history ist leer.") return plt.figure(figsize=(16, 12)) plt.subplot(2, 2, 1) for label, activations in activation_history.items(): if len(activations) > 0: plt.plot(range(1, len(activations) + 1), activations, label=label) plt.title("Entwicklung der Aktivierungen während des Lernens") plt.xlabel("Epoche") plt.ylabel("Aktivierung") plt.legend() plt.grid(True) plt.subplot(2, 2, 2) for label, weights in weights_history.items(): if len(weights) > 0: plt.plot(range(1, len(weights) + 1), weights, label=label, alpha=0.7) plt.title("Entwicklung der Verbindungsgewichte während des Lernens") plt.xlabel("Epoche") plt.ylabel("Gewicht") plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') plt.grid(True) plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") plt.close() logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") def plot_memory_distribution(short_term_memory, mid_term_memory, long_term_memory, filename="memory_distribution.png"): counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)] labels = ["Kurzfristig", "Mittelfristig", "Langfristig"] plt.figure(figsize=(8, 6)) plt.bar(labels, counts, color=["red", "blue", "green"]) plt.title("Verteilung der Gedächtnisknoten") plt.ylabel("Anzahl der Knoten") plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") plt.close() logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") def plot_activation_heatmap(activation_history, filename="activation_heatmap.png"): if not activation_history: logging.warning("No activation history to plot") return min_length = min(len(activations) for activations in activation_history.values()) truncated_activations = {key: values[:min_length] for key, values in activation_history.items()} plt.figure(figsize=(12, 8)) heatmap_data = np.array([activations for activations in truncated_activations.values()]) if heatmap_data.size == 0: logging.error("Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.") return sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False) plt.title("Heatmap der Aktivierungswerte") plt.xlabel("Kategorie") plt.ylabel("Epoche") plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") plt.close() logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") def plot_network_topology(category_nodes, new_brains, filename="network_topology.png"): G = nx.DiGraph() for node in category_nodes: G.add_node(node.label) for conn in node.connections: G.add_edge(node.label, conn.target_node.label, weight=conn.weight) for brain in new_brains: G.add_node(brain.label, color='red') for conn in brain.connections: G.add_edge(brain.label, conn.target_node.label, weight=conn.weight) pos = nx.spring_layout(G) edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)} node_colors = [G.nodes[node].get('color', 'skyblue') for node in G.nodes()] nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray") nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels) plt.title("Netzwerktopologie") plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") plt.close() logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") ``` ### Modell speichern und laden ```python def save_model(category_nodes, filename="model.json"): model_data = { "nodes": [node.save_state() for node in category_nodes] } with open(filename, "w") as file: json.dump(model_data, file, indent=4) logging.info(f"Modell gespeichert in {filename}") def save_model_with_questions_and_answers(category_nodes, questions, filename="model_with_qa.json"): global model_saved logging.info("Starte Speichern des Modells...") # Überprüfen, ob Änderungen vorgenommen wurden current_model_data = { "nodes": [node.save_state() for node in category_nodes], "questions": questions } if os.path.exists(filename): try: with open(filename, "r", encoding="utf-8") as file: existing_model_data = json.load(file) if existing_model_data == current_model_data: logging.info("Keine Änderungen erkannt, erneutes Speichern übersprungen.") return except Exception as e: logging.warning(f"Fehler beim Überprüfen des vorhandenen Modells: {e}") # Speichern des aktualisierten Modells try: with open(filename, "w", encoding="utf-8") as file: json.dump(current_model_data, file, indent=4) logging.info(f"Modell erfolgreich gespeichert unter {filename}.") model_saved = True # Setze auf True nach erfolgreichem Speichern except Exception as e: logging.error(f"Fehler beim Speichern des Modells: {e}") def load_model_with_questions_and_answers(filename="model_with_qa.json"): global initialized if initialized: logging.info("Modell bereits initialisiert.") return None, None if not os.path.exists(filename): logging.warning(f"Datei {filename} nicht gefunden. Netzwerk wird initialisiert.") return None, None try: with open(filename, "r", encoding="utf-8") as file: model_data = json.load(file) nodes_dict = {node_data["label"]: Node(node_data["label"]) for node_data in model_data["nodes"]} for node_data in model_data["nodes"]: node = nodes_dict[node_data["label"]] node.activation = node_data.get("activation", 0.0) for conn_state in node_data["connections"]: target_node = nodes_dict.get(conn_state["target"]) if target_node: node.add_connection(target_node, conn_state["weight"]) questions = model_data.get("questions", []) logging.info(f"Modell geladen mit {len(nodes_dict)} Knoten und {len(questions)} Fragen") initialized = True return list(nodes_dict.values()), questions except json.JSONDecodeError as e: logging.error(f"Fehler beim Parsen der JSON-Datei: {e}") return None, None ``` ### Fragen aktualisieren ```python def update_questions_with_answers(filename="model_with_qa.json"): with open(filename, "r") as file: model_data = json.load(file) for question in model_data["questions"]: if "answer" not in question: question["answer"] = input(f"Gib die Antwort für: '{question['question']}': ") with open(filename, "w") as file: json.dump(model_data, file, indent=4) logging.info(f"Fragen wurden mit Antworten aktualisiert und gespeichert in {filename}") ``` ### Beste Antwort finden ```python def find_best_answer(category_nodes, questions, query): matched_question = find_similar_question(questions, query) if matched_question: logging.info(f"Gefundene Frage: {matched_question['question']} -> Kategorie: {matched_question['category']}") answer = matched_question.get("answer", "Keine Antwort verfügbar") logging.info(f"Antwort: {answer}") return answer else: logging.warning("Keine passende Frage gefunden.") return None ``` ### Dashboard erstellen ```python def create_dashboard(category_nodes, activation_history, short_term_memory, mid_term_memory, long_term_memory): root = tk.Tk() root.title("Psyco Dashboard") # Anzeige der Aktivierungshistorie activation_frame = ttk.Frame(root, padding="10") activation_frame.pack(fill=tk.BOTH, expand=True) activation_label = ttk.Label(activation_frame, text="Aktivierungshistorie") activation_label.pack() if activation_history: for label, activations in activation_history.items(): fig, ax = plt.subplots() ax.plot(range(1, len(activations) + 1), activations) ax.set_title(label) canvas = FigureCanvasTkAgg(fig, master=activation_frame) canvas.draw() canvas.get_tk_widget().pack() else: no_data_label = ttk.Label(activation_frame, text="Keine Aktivierungshistorie verfügbar.") no_data_label.pack() # Anzeige der Gedächtnisverteilung memory_frame = ttk.Frame(root, padding="10") memory_frame.pack(fill=tk.BOTH, expand=True) memory_label = ttk.Label(memory_frame, text="Gedächtnisverteilung") memory_label.pack() memory_counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)] labels = ["Kurzfristig", "Mittelfristig", "Langfristig"] fig, ax = plt.subplots() ax.bar(labels, memory_counts, color=["red", "blue", "green"]) ax.set_title("Verteilung der Gedächtnisknoten") ax.set_ylabel("Anzahl der Knoten") canvas = FigureCanvasTkAgg(fig, master=memory_frame) canvas.draw() canvas.get_tk_widget().pack() # Anzeige der Netzwerktopologie topology_frame = ttk.Frame(root, padding="10") topology_frame.pack(fill=tk.BOTH, expand=True) topology_label = ttk.Label(topology_frame, text="Netzwerktopologie") topology_label.pack() G = nx.DiGraph() for node in category_nodes: G.add_node(node.label) for conn in node.connections: G.add_edge(node.label, conn.target_node.label, weight=conn.weight) pos = nx.spring_layout(G) edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)} node_colors = ['skyblue' for _ in G.nodes()] fig, ax = plt.subplots() nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray", ax=ax) nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, ax=ax) ax.set_title("Netzwerktopologie") canvas = FigureCanvasTkAgg(fig, master=topology_frame) canvas.draw() canvas.get_tk_widget().pack() # Anzeige der Heatmap der Aktivierungswerte heatmap_frame = ttk.Frame(root, padding="10") heatmap_frame.pack(fill=tk.BOTH, expand=True) heatmap_label = ttk.Label(heatmap_frame, text="Heatmap der Aktivierungswerte") heatmap_label.pack() if activation_history: min_length = min(len(activations) for activations in activation_history.values()) truncated_activations = {key: values[:min_length] for key, values in activation_history.items()} heatmap_data = np.array([activations for activations in truncated_activations.values()]) if heatmap_data.size > 0: fig, ax = plt.subplots() sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False, ax=ax) ax.set_title("Heatmap der Aktivierungswerte") ax.set_xlabel("Kategorie") ax.set_ylabel("Epoche") canvas = FigureCanvasTkAgg(fig, master=heatmap_frame) canvas.draw() canvas.get_tk_widget().pack() else: no_data_label = ttk.Label(heatmap_frame, text="Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.") no_data_label.pack() else: no_data_label = ttk.Label(heatmap_frame, text="Keine Aktivierungshistorie verfügbar.") no_data_label.pack() root.mainloop() ``` ### CSV-Datei verarbeiten ```python def process_csv_in_chunks(filename, chunk_size=10000): global category_nodes, questions logging.info(f"Beginne Verarbeitung der Datei: {filename}") try: # Test, ob die Datei existiert if not os.path.exists(filename): logging.error(f"Datei {filename} nicht gefunden.") return None all_chunks = [] for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8", on_bad_lines='skip'): logging.info(f"Chunk mit {len(chunk)} Zeilen gelesen.") if 'Frage' not in chunk.columns or 'Kategorie' not in chunk.columns or 'Antwort' not in chunk.columns: logging.error("CSV-Datei enthält nicht die erwarteten Spalten: 'Frage', 'Kategorie', 'Antwort'") return None all_chunks.append(chunk) data = pd.concat(all_chunks, ignore_index=True) logging.info(f"Alle Chunks erfolgreich verarbeitet. Gesamtzeilen: {len(data)}") return data except pd.errors.EmptyDataError: logging.error("CSV-Datei ist leer.") except pd.errors.ParserError as e: logging.error(f"Parsing-Fehler in CSV-Datei: {e}") except Exception as e: logging.error(f"Unerwarteter Fehler beim Verarbeiten der Datei: {e}") return None ``` ### Einzelne Einträge verarbeiten ```python def process_single_entry(question, category, answer): global category_nodes, questions # Sicherstellen, dass die globalen Variablen initialisiert sind if category_nodes is None: category_nodes = [] logging.warning("Kategorie-Knotenliste war None, wurde nun initialisiert.") if questions is None: questions = [] logging.warning("Fragenliste war None, wurde nun initialisiert.") # Überprüfen, ob die Kategorie bereits vorhanden ist if not any(node.label == category for node in category_nodes): category_nodes.append(Node(category)) logging.info(f"Neue Kategorie '{category}' dem Netzwerk hinzugefügt.") # Frage, Kategorie und Antwort zur Liste hinzufügen questions.append({"question": question, "category": category, "answer": answer}) logging.info(f"Neue Frage hinzugefügt: '{question}' -> Kategorie: '{category}'") ``` ### CSV-Datei mit Dask verarbeiten ```python def process_csv_with_dask(filename, chunk_size=10000): try: ddf = dd.read_csv(filename, blocksize=chunk_size) ddf = ddf.astype({'Kategorie': 'category'}) for row in ddf.itertuples(index=False, name=None): process_single_entry(row[0], row[1], row[2]) logging.info("Alle Chunks erfolgreich mit Dask verarbeitet.") except Exception as e: logging.error(f"Fehler beim Verarbeiten der Datei mit Dask: {e}") ``` ### In SQLite speichern ```python def save_to_sqlite(filename, db_name="dataset.db"): conn = sqlite3.connect(db_name) chunk_iter = pd.read_csv(filename, chunksize=10000) for chunk in chunk_iter: chunk.to_sql("qa_data", conn, if_exists="append", index=False) logging.info(f"Chunk mit {len(chunk)} Zeilen gespeichert.") conn.close() logging.info("CSV-Daten wurden erfolgreich in SQLite gespeichert.") ``` ### Aus SQLite laden ```python def load_from_sqlite(db_name="dataset.db"): conn = sqlite3.connect(db_name) query = "SELECT Frage, Kategorie, Antwort FROM qa_data" data = pd.read_sql_query(query, conn) conn.close() return data ``` ### Teilmodell speichern ```python def save_partial_model(filename="partial_model.json"): model_data = { "nodes": [node.save_state() for node in category_nodes], "questions": questions } with open(filename, "w") as file: json.dump(model_data, file, indent=4) logging.info("Teilmodell gespeichert.") ``` ### CSV-Datei faul laden ```python def lazy_load_csv(filename, chunk_size=10000): for chunk in pd.read_csv(filename, chunksize=chunk_size): for _, row in chunk.iterrows(): yield row['Frage'], row['Kategorie'], row['Antwort'] ``` ### Hauptfunktion ```python def main(): start_time = time.time() category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") if category_nodes is None: csv_file = "data.csv" data = process_csv_in_chunks(csv_file) if data is None: logging.error("Fehler beim Laden der CSV-Datei.") return if len(data) > 1000: logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") split_csv(csv_file) # Verarbeite jede aufgeteilte Datei data_dir = "data" for filename in os.listdir(data_dir): if filename.endswith(".csv"): file_path = os.path.join(data_dir, filename) logging.info(f"Verarbeite Datei: {file_path}") data = process_csv_in_chunks(file_path) if data is None: logging.error("Fehler beim Laden der CSV-Datei.") return categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions) save_model_with_questions_and_answers(category_nodes, questions) else: logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions) save_model_with_questions_and_answers(category_nodes, questions) end_time = time.time() logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden") ``` ### Simulation aus der GUI starten ```python def run_simulation_from_gui(learning_rate, decay_rate, reward_interval, epochs): global model_saved model_saved = False # Erzwinge das Speichern nach dem Training start_time = time.time() csv_file = "data.csv" category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") if category_nodes is None: data = process_csv_in_chunks(csv_file) if not isinstance(data, pd.DataFrame): logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") return if len(data) > 1000: logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") split_csv(csv_file) # Verarbeite jede aufgeteilte Datei data_dir = "data" for filename in os.listdir(data_dir): if filename.endswith(".csv"): file_path = os.path.join(data_dir, filename) logging.info(f"Verarbeite Datei: {file_path}") data = process_csv_in_chunks(file_path) if not isinstance(data, pd.DataFrame): logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") return categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} activation_history, weights_history = simulate_learning( data, category_nodes, personality_distributions, epochs=int(epochs), learning_rate=float(learning_rate), reward_interval=int(reward_interval), decay_rate=float(decay_rate) ) save_model_with_questions_and_answers(category_nodes, questions) else: logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} activation_history, weights_history = simulate_learning( data, category_nodes, personality_distributions, epochs=int(epochs), learning_rate=float(learning_rate), reward_interval=int(reward_interval), decay_rate=float(decay_rate) ) save_model_with_questions_and_answers(category_nodes, questions) else: data = process_csv_in_chunks(csv_file) if not isinstance(data, pd.DataFrame): logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") return logging.info(f"Anzahl der Zeilen in der geladenen CSV: {len(data)}") personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} activation_history, weights_history = simulate_learning( data, category_nodes, personality_distributions, epochs=int(epochs), learning_rate=float(learning_rate), reward_interval=int(reward_interval), decay_rate=float(decay_rate) ) save_model_with_questions_and_answers(category_nodes, questions) end_time = time.time() logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden") messagebox.showinfo("Ergebnis", f"Simulation abgeschlossen! Dauer: {end_time - start_time:.2f} Sekunden") ``` ### Netzwerk asynchron initialisieren ```python def async_initialize_network(): global category_nodes, questions, model_saved logging.info("Starte Initialisierung des Netzwerks...") category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") if category_nodes is None: category_nodes = [] logging.warning("Keine gespeicherten Kategorien gefunden. Neues Netzwerk wird erstellt.") model_saved = False # Zurücksetzen der Speicher-Flagge if questions is None: questions = [] logging.warning("Keine gespeicherten Fragen gefunden. Neues Fragen-Array wird erstellt.") model_saved = False # Zurücksetzen der Speicher-Flagge if not category_nodes: csv_file = "data.csv" data = process_csv_in_chunks(csv_file) if isinstance(data, pd.DataFrame): if len(data) > 1000: logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") split_csv(csv_file) # Verarbeite jede aufgeteilte Datei data_dir = "data" for filename in os.listdir(data_dir): if filename.endswith(".csv"): file_path = os.path.join(data_dir, filename) logging.info(f"Verarbeite Datei: {file_path}") data = process_csv_in_chunks(file_path) if isinstance(data, pd.DataFrame): categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.") model_saved = False # Zurücksetzen der Speicher-Flagge else: logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") categories = data['Kategorie'].unique() category_nodes = initialize_quiz_network(categories) questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.") model_saved = False # Zurücksetzen der Speicher-Flagge else: logging.error("Fehler beim Laden der CSV-Daten. Netzwerk konnte nicht initialisiert werden.") return save_model_with_questions_and_answers(category_nodes, questions) logging.info("Netzwerk erfolgreich initialisiert.") ``` ### GUI starten ```python def start_gui(): def start_simulation(): try: threading.Thread(target=run_simulation_from_gui, args=(0.8, 0.002, 5, 10), daemon=True).start() messagebox.showinfo("Info", "Simulation gestartet!") logging.info("Simulation gestartet") except Exception as e: logging.error(f"Fehler beim Start der Simulation: {e}") messagebox.showerror("Fehler", f"Fehler: {e}") root = tk.Tk() root.title("DRLCogNet GUI") root.geometry("400x300") header_label = tk.Label(root, text="Simulationseinstellungen", font=("Helvetica", 16)) header_label.pack(pady=10) start_button = tk.Button(root, text="Simulation starten", command=start_simulation) start_button.pack(pady=20) root.mainloop() ``` ### Hauptprogramm ```python if __name__ == "__main__": # Starte die Initialisierung in einem Thread threading.Thread(target=async_initialize_network, daemon=True).start() start_gui() ``` ## Fragen zur Datenbank (SQLite) ### Wird die Datenbank im Arbeitsspeicher erstellt? Ja, die SQLite-Datenbank wird im Arbeitsspeicher erstellt, wenn die Funktion `save_to_sqlite` aufgerufen wird. Diese Funktion erstellt eine SQLite-Datenbankdatei (standardmäßig `dataset.db`), die im Arbeitsspeicher gespeichert wird, wenn Sie sie nicht an einem anderen Ort speichern. ### Wie wird die Datenbank erstellt? Die Datenbank wird erstellt, indem eine Verbindung zur SQLite-Datenbank hergestellt wird. Wenn die Datei `dataset.db` nicht existiert, wird sie erstellt. Anschließend werden die Daten aus der CSV-Datei in Chunks gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert. ### Wie werden die Daten in die Datenbank geladen? Die Daten werden in Chunks aus der CSV-Datei gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert. Die Funktion `to_sql` von Pandas wird verwendet, um die Daten in die Datenbank zu schreiben. ### Wie werden die Daten aus der Datenbank geladen? Die Daten werden aus der Datenbank geladen, indem eine Verbindung zur SQLite-Datenbank hergestellt und eine SQL-Abfrage ausgeführt wird, um die Daten aus der Tabelle `qa_data` zu lesen. Die Funktion `read_sql_query` von Pandas wird verwendet, um die Daten in einen Pandas-DataFrame zu laden. ### Beispielcode zur Verwendung der Datenbank ```python # Daten in die Datenbank speichern save_to_sqlite("data.csv") # Daten aus der Datenbank laden data = load_from_sqlite() ``` ## Fazit Diese Dokumentation bietet eine umfassende Übersicht über den Code und die Verwendung der SQLite-Datenbank zur Speicherung und zum Laden von Daten. Der Code ist modular aufgebaut und ermöglicht die Verarbeitung und Simulation von Daten aus einer CSV-Datei in einem neuronalen Netzwerk. Die SQLite-Datenbank wird im Arbeitsspeicher erstellt und ermöglicht die effiziente Speicherung und das Laden von Daten.