DRLCogNet / doku.md
Rkruemmel's picture
Upload 5 files
6481b89 verified

Code-Dokumentation

Einführung

Dieses Dokument beschreibt den Code zur Verarbeitung und Simulation von Daten aus einer CSV-Datei, die in einem neuronalen Netzwerk verwendet werden. Der Code umfasst Funktionen zur Initialisierung des Netzwerks, zur Verarbeitung der CSV-Datei, zur Simulation des Lernprozesses und zur Speicherung und Laden des Modells.

Abhängigkeiten

  • pandas: Zur Verarbeitung von CSV-Dateien.
  • numpy: Für numerische Operationen.
  • random: Für zufällige Operationen.
  • tqdm: Für Fortschrittsanzeigen.
  • tkinter: Für die grafische Benutzeroberfläche.
  • seaborn: Für die Visualisierung.
  • networkx: Für die Erstellung und Analyse von Graphen.
  • json: Für die Speicherung und das Laden von Modellen.
  • os: Für Dateioperationen.
  • time: Für Zeitmessungen.
  • torch: Für neuronale Netzwerke.
  • threading: Für die Verwaltung von Threads.
  • logging: Für die Protokollierung.
  • sqlite3: Für die Verwendung von SQLite-Datenbanken.
  • dask.dataframe: Für die parallele Verarbeitung von Daten.

Konfiguration des Loggers

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

Globale Variablen

  • initialized: Überprüft, ob das Netzwerk initialisiert wurde.
  • category_nodes: Liste der Knoten im Netzwerk.
  • questions: Liste der Fragen.
  • model_saved: Schutzvariable, um zu überprüfen, ob das Modell gespeichert wurde.

Überprüfen, ob der Ordner existiert

output_dir = "plots"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

Funktionen

Funktion zum Aufteilen der CSV-Datei

def split_csv(filename, chunk_size=1000, output_dir="data"):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    chunk_iter = pd.read_csv(filename, chunksize=chunk_size)
    for i, chunk in enumerate(chunk_iter):
        chunk.to_csv(os.path.join(output_dir, f"data_part_{i}.csv"), index=False)
        logging.info(f"Chunk {i} mit {len(chunk)} Zeilen gespeichert.")

Verbesserung 1: Verstärkung der Verbindungen bei häufig gestellten Fragen

def strengthen_question_connection(category_nodes, question, category):
    category_node = next((node for node in category_nodes if node.label == category), None)
    if category_node:
        for conn in category_node.connections:
            if conn.target_node.label == question:
                old_weight = conn.weight
                conn.weight += 0.1  # Verstärkung der Verbindung
                conn.weight = np.clip(conn.weight, 0, 1.0)
                logging.info(f"Verstärkte Verbindung für Frage '{question}' in Kategorie '{category}': {old_weight:.4f} -> {conn.weight:.4f}")

Verbesserung 2: Erweiterte Hebb'sche Lernregel zur besseren Zuordnung von Fragen

def enhanced_hebbian_learning(node, target_node, learning_rate=0.2, decay_factor=0.01):
    old_weight = None
    for conn in node.connections:
        if conn.target_node == target_node:
            old_weight = conn.weight
            conn.weight += learning_rate * node.activation * target_node.activation
            conn.weight = np.clip(conn.weight - decay_factor * conn.weight, 0, 1.0)
            break

    if old_weight is not None:
        logging.info(f"Hebb'sches Lernen angewendet: Gewicht {old_weight:.4f} -> {conn.weight:.4f}")

Verbesserung 3: Simulation der Frageverarbeitung im Netzwerk

def simulate_question_answering(category_nodes, question, questions):
    category = next((q['category'] for q in questions if q['question'] == question), None)
    if not category:
        logging.warning(f"Frage '{question}' nicht gefunden!")
        return None

    category_node = next((node for node in category_nodes if node.label == category), None)
    if category_node:
        propagate_signal(category_node, input_signal=0.9, emotion_weights={}, emotional_state=1.0)
        activation = category_node.activation
        if activation is None or activation <= 0:
            logging.warning(f"Kategorie '{category}' hat eine ungültige Aktivierung: {activation}")
            return 0.0  # Rückgabe von 0, falls die Aktivierung fehlschlägt
        logging.info(f"Verarbeite Frage: '{question}' → Kategorie: '{category}' mit Aktivierung {activation:.4f}")
        return activation  # Entfernte doppelte Logging-Ausgabe
    else:
        logging.warning(f"Kategorie '{category}' nicht im Netzwerk gefunden. Die Kategorie wird neu hinzugefügt!")
        return 0.0

Verbesserung 4: Finden der besten passenden Frage zur Benutzeranfrage

def find_question_by_keyword(questions, keyword):
    matching_questions = [q for q in questions if keyword.lower() in q['question'].lower()]
    return matching_questions if matching_questions else None

Verbesserung 5: Suche nach der ähnlichsten Frage basierend auf einfachen Ähnlichkeitsmetriken

def find_similar_question(questions, query):
    from difflib import get_close_matches
    question_texts = [q['question'] for q in questions]
    closest_matches = get_close_matches(query, question_texts, n=1, cutoff=0.6)

    if closest_matches:
        matched_question = next((q for q in questions if q['question'] == closest_matches[0]), None)
        return matched_question
    else:
        return {"question": "Keine passende Frage gefunden", "category": "Unbekannt"}

Verbesserung 6: Testfunktion zur Überprüfung des Modells

def test_model(category_nodes, questions, query):
    matched_question = find_question_by_keyword(questions, query)
    if matched_question:
        logging.info(f"Gefundene Frage: {matched_question[0]['question']} -> Kategorie: {matched_question[0]['category']}")
        simulate_question_answering(category_nodes, matched_question[0]['question'], questions)
    else:
        logging.warning("Keine passende Frage gefunden.")

    similarity_question = find_similar_question(questions, query)
    logging.info(f"Ähnlichste Frage: {similarity_question['question']} -> Kategorie: {similarity_question['category']}")

NetworkX-Funktionen für kausale Graphen

def build_causal_graph(category_nodes):
    G = nx.DiGraph()
    for node in category_nodes:
        G.add_node(node.label)
        for conn in node.connections:
            G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
    return G

def analyze_causality_multiple(G, num_pairs=3):
    if len(G.nodes) < 2:
        logging.warning("Graph enthält nicht genügend Knoten für eine Analyse.")
        return

    for _ in range(num_pairs):
        start_node, target_node = random.sample(G.nodes, 2)
        logging.info(f"Analysiere kausale Pfade von '{start_node}' nach '{target_node}'")

        try:
            paths = list(nx.all_simple_paths(G, source=start_node, target=target_node))
            if paths:
                for path in paths:
                    logging.info(f"Kausaler Pfad: {' -> '.join(path)}")
            else:
                logging.info(f"Kein Pfad gefunden von '{start_node}' nach '{target_node}'")
        except nx.NetworkXNoPath:
            logging.warning(f"Kein direkter Pfad zwischen '{start_node}' und '{target_node}' gefunden.")

def analyze_node_influence(G):
    influence_scores = nx.pagerank(G, alpha=0.85)
    sorted_influences = sorted(influence_scores.items(), key=lambda x: x[1], reverse=True)
    for node, score in sorted_influences:
        logging.info(f"Knoten: {node}, Einfluss: {score:.4f}")

Funktion für Interventionen basierend auf Pearl's Do-Operator

def do_intervention(node, new_value):
    logging.info(f"Intervention: Setze {node.label} auf {new_value}")
    node.activation = new_value
    for conn in node.connections:
        conn.target_node.activation += node.activation * conn.weight

Kontextabhängiges Lernen verstärken

def contextual_causal_analysis(node, context_factors, learning_rate=0.1):
    context_factor = context_factors.get(node.label, 1.0)
    if node.activation > 0.8 and context_factor > 1.0:
        logging.info(f"Kausale Beziehung verstärkt für {node.label} aufgrund des Kontextes.")
        for conn in node.connections:
            conn.weight += learning_rate * context_factor
            conn.weight = np.clip(conn.weight, 0, 1.0)
            logging.info(f"Gewicht aktualisiert: {node.label}{conn.target_node.label}, Gewicht: {conn.weight:.4f}")

PyTorch-Modell für kausale Inferenz

class CausalInferenceNN(nn.Module):
    def __init__(self):
        super(CausalInferenceNN, self).__init__()
        self.fc1 = nn.Linear(10, 20)
        self.fc2 = nn.Linear(20, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

Debugging-Funktion

def debug_connections(category_nodes):
    start_time = time.time()
    for node in category_nodes:
        logging.info(f"Knoten: {node.label}")
        for conn in node.connections:
            logging.info(f" Verbindung zu: {conn.target_node.label}, Gewicht: {conn.weight}")
    end_time = time.time()
    logging.info(f"debug_connections Ausführungszeit: {end_time - start_time:.4f} Sekunden")

Hilfsfunktionen

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def add_activation_noise(activation, noise_level=0.1):
    noise = np.random.normal(0, noise_level)
    return np.clip(activation + noise, 0.0, 1.0)

def decay_weights(category_nodes, decay_rate=0.002, forgetting_curve=0.95):
    for node in category_nodes:
        for conn in node.connections:
            conn.weight *= (1 - decay_rate) * forgetting_curve

def reward_connections(category_nodes, target_category, reward_factor=0.1):
    for node in category_nodes:
        if node.label == target_category:
            for conn in node.connections:
                conn.weight += reward_factor
                conn.weight = np.clip(conn.weight, 0, 1.0)

def apply_emotion_weight(activation, category_label, emotion_weights, emotional_state=1.0):
    emotion_factor = emotion_weights.get(category_label, 1.0) * emotional_state
    return activation * emotion_factor

def generate_simulated_answers(data, personality_distributions):
    simulated_answers = []
    for _, row in data.iterrows():
        category = row['Kategorie']
        mean = personality_distributions.get(category, 0.5)
        simulated_answer = np.clip(np.random.normal(mean, 0.2), 0.0, 1.0)
        simulated_answers.append(simulated_answer)
    return simulated_answers

def social_influence(category_nodes, social_network, influence_factor=0.1):
    for node in category_nodes:
        for conn in node.connections:
            social_impact = sum([social_network.get(conn.target_node.label, 0)]) * influence_factor
            conn.weight += social_impact
            conn.weight = np.clip(conn.weight, 0, 1.0)

def update_emotional_state(emotional_state, emotional_change_rate=0.02):
    emotional_state += np.random.normal(0, emotional_change_rate)
    return np.clip(emotional_state, 0.7, 1.5)

def apply_contextual_factors(activation, node, context_factors):
    context_factor = context_factors.get(node.label, 1.0)
    return activation * context_factor * random.uniform(0.9, 1.1)

def long_term_memory(category_nodes, long_term_factor=0.01):
    for node in category_nodes:
        for conn in node.connections:
            conn.weight += long_term_factor * conn.weight
            conn.weight = np.clip(conn.weight, 0, 1.0)

def hebbian_learning(node, learning_rate=0.3, weight_limit=1.0, reg_factor=0.005):
    for connection in node.connections:
        old_weight = connection.weight
        connection.weight += learning_rate * node.activation * connection.target_node.activation
        connection.weight = np.clip(connection.weight, -weight_limit, weight_limit)
        connection.weight -= reg_factor * connection.weight
        node.activation_history.append(node.activation)  # Aktivierung speichern
        connection.target_node.activation_history.append(connection.target_node.activation)
        logging.info(f"Hebb'sches Lernen: Gewicht von {old_weight:.4f} auf {connection.weight:.4f} erhöht")

Klassen für Netzwerkstruktur

class Connection:
    def __init__(self, target_node, weight=None):
        self.target_node = target_node
        self.weight = weight if weight is not None else random.uniform(0.1, 1.0)

class Node:
    def __init__(self, label):
        self.label = label
        self.connections = []
        self.activation = 0.0
        self.activation_history = []

    def add_connection(self, target_node, weight=None):
        self.connections.append(Connection(target_node, weight))

    def save_state(self):
        return {
            "label": self.label,
            "activation": self.activation,
            "activation_history": self.activation_history,
            "connections": [{"target": conn.target_node.label, "weight": conn.weight} for conn in self.connections]
        }

    @staticmethod
    def load_state(state, nodes_dict):
        node = Node(state["label"])
        node.activation = state["activation"]
        node.activation_history = state["activation_history"]
        for conn_state in state["connections"]:
            target_node = nodes_dict[conn_state["target"]]
            connection = Connection(target_node, conn_state["weight"])
            node.connections.append(connection)
        return node

class MemoryNode(Node):
    def __init__(self, label, memory_type="short_term"):
        super().__init__(label)
        self.memory_type = memory_type
        self.retention_time = {"short_term": 5, "mid_term": 20, "long_term": 100}[memory_type]
        self.time_in_memory = 0

    def decay(self, decay_rate, context_factors, emotional_state):
        context_factor = context_factors.get(self.label, 1.0)
        emotional_factor = emotional_state
        for conn in self.connections:
            if self.memory_type == "short_term":
                conn.weight *= (1 - decay_rate * 2 * context_factor * emotional_factor)
            elif self.memory_type == "mid_term":
                conn.weight *= (1 - decay_rate * context_factor * emotional_factor)
            elif self.memory_type == "long_term":
                conn.weight *= (1 - decay_rate * 0.5 * context_factor * emotional_factor)

    def promote(self, activation_threshold=0.7):
        if len(self.activation_history) == 0:
            return
        if self.memory_type == "short_term" and np.mean(self.activation_history[-5:]) > activation_threshold:
            self.memory_type = "mid_term"
            self.retention_time = 20
        elif self.memory_type == "mid_term" and np.mean(self.activation_history[-20:]) > activation_threshold:
            self.memory_type = "long_term"
            self.retention_time = 100

class CortexCreativus(Node):
    def __init__(self, label):
        super().__init__(label)

    def generate_new_ideas(self, category_nodes):
        new_ideas = []
        for node in category_nodes:
            if node.activation > 0.5:
                new_idea = f"New idea based on {node.label} with activation {node.activation}"
                new_ideas.append(new_idea)
        return new_ideas

class SimulatrixNeuralis(Node):
    def __init__(self, label):
        super().__init__(label)

    def simulate_scenarios(self, category_nodes):
        scenarios = []
        for node in category_nodes:
            if node.activation > 0.5:
                scenario = f"Simulated scenario based on {node.label} with activation {node.activation}"
                scenarios.append(scenario)
        return scenarios

class CortexCriticus(Node):
    def __init__(self, label):
        super().__init__(label)

    def evaluate_ideas(self, ideas):
        evaluated_ideas = []
        for idea in ideas:
            evaluation_score = random.uniform(0, 1)
            evaluation = f"Evaluated idea: {idea} - Score: {evaluation_score}"
            evaluated_ideas.append(evaluation)
        return evaluated_ideas

class LimbusAffectus(Node):
    def __init__(self, label):
        super().__init__(label)

    def apply_emotion_weight(self, ideas, emotional_state):
        weighted_ideas = []
        for idea in ideas:
            weighted_idea = f"Emotionally weighted idea: {idea} - Weight: {emotional_state}"
            weighted_ideas.append(weighted_idea)
        return weighted_ideas

class MetaCognitio(Node):
    def __init__(self, label):
        super().__init__(label)

    def optimize_system(self, category_nodes):
        for node in category_nodes:
            node.activation *= random.uniform(0.9, 1.1)

class CortexSocialis(Node):
    def __init__(self, label):
        super().__init__(label)

    def simulate_social_interactions(self, category_nodes):
        interactions = []
        for node in category_nodes:
            if node.activation > 0.5:
                interaction = f"Simulated social interaction based on {node.label} with activation {node.activation}"
                interactions.append(interaction)
        return interactions

def connect_new_brains_to_network(category_nodes, new_brains):
    for brain in new_brains:
        for node in category_nodes:
            brain.add_connection(node)
            node.add_connection(brain)

Netzwerk-Initialisierung

def initialize_quiz_network(categories):
    try:
        category_nodes = [Node(c) for c in categories]
        for node in category_nodes:
            for target_node in category_nodes:
                if node != target_node:
                    node.add_connection(target_node)
                    logging.debug(f"Verbindung hinzugefügt: {node.label}{target_node.label}")
        debug_connections(category_nodes)
        for node in category_nodes:
            logging.info(f"Knoten erstellt: {node.label}")
            for conn in node.connections:
                logging.info(f"  → Verbindung zu {conn.target_node.label} mit Gewicht {conn.weight:.4f}")
        return category_nodes
    except Exception as e:
        logging.error(f"Fehler bei der Netzwerk-Initialisierung: {e}")
        return []

Signalpropagation

def propagate_signal(node, input_signal, emotion_weights, emotional_state=1.0, context_factors=None):
    node.activation = add_activation_noise(sigmoid(input_signal * random.uniform(0.8, 1.2)))
    node.activation_history.append(node.activation)  # Aktivierung speichern
    node.activation = apply_emotion_weight(node.activation, node.label, emotion_weights, emotional_state)
    if context_factors:
        node.activation = apply_contextual_factors(node.activation, node, context_factors)
    logging.info(f"Signalpropagation für {node.label}: Eingangssignal {input_signal:.4f}")
    for connection in node.connections:
        logging.info(f"  → Signal an {connection.target_node.label} mit Gewicht {connection.weight:.4f}")
        connection.target_node.activation += node.activation * connection.weight

def propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state):
    node.activation = add_activation_noise(sigmoid(input_signal))
    node.activation_history.append(node.activation)
    for connection in node.connections:
        connection.target_node.activation += node.activation * connection.weight
    for memory_node in memory_nodes:
        memory_node.time_in_memory += 1
        memory_node.promote()

Simulation mit Anpassungen

def simulate_learning(data, category_nodes, personality_distributions, epochs=1, learning_rate=0.8, reward_interval=5, decay_rate=0.002, emotional_state=1.0, context_factors=None):
    if context_factors is None:
        context_factors = {}

    weights_history = {f"{node.label}{conn.target_node.label}": [] for node in category_nodes for conn in node.connections}
    activation_history = {node.label: [] for node in category_nodes}
    question_nodes = []

    for idx, row in data.iterrows():
        q_node = Node(row['Frage'])
        question_nodes.append(q_node)
        category_label = row['Kategorie'].strip()
        category_node = next((c for c in category_nodes if c.label == category_label), None)
        if category_node:
            q_node.add_connection(category_node)
            logging.debug(f"Verbindung hinzugefügt: {q_node.label}{category_node.label}")
        else:
            logging.warning(f"Warnung: Kategorie '{category_label}' nicht gefunden für Frage '{row['Frage']}'.")

    emotion_weights = {category: 1.0 for category in data['Kategorie'].unique()}
    social_network = {category: random.uniform(0.1, 1.0) for category in data['Kategorie'].unique()}

    for epoch in range(epochs):
        logging.info(f"\n--- Epoche {epoch + 1} ---")
        simulated_answers = generate_simulated_answers(data, personality_distributions)

        for node in category_nodes:
            node.activation_sum = 0.0
            node.activation_count = 0

        for node in category_nodes:
            propagate_signal(node, random.uniform(0.1, 0.9), emotion_weights, emotional_state, context_factors)
            node.activation_history.append(node.activation)  # Aktivierung speichern

        for idx, q_node in enumerate(question_nodes):
            for node in category_nodes + question_nodes:
                node.activation = 0.0
            answer = simulated_answers[idx]
            propagate_signal(q_node, answer, emotion_weights, emotional_state, context_factors)
            q_node.activation_history.append(q_node.activation)  # Aktivierung speichern
            hebbian_learning(q_node, learning_rate)

            for node in category_nodes:
                node.activation_sum += node.activation
                if node.activation > 0:
                    node.activation_count += 1

            for node in category_nodes:
                for conn in node.connections:
                    weights_history[f"{node.label}{conn.target_node.label}"].append(conn.weight)
                    logging.debug(f"Gewicht aktualisiert: {node.label}{conn.target_node.label}, Gewicht: {conn.weight}")

            # Kausalitätsverstärkung anwenden
            contextual_causal_analysis(q_node, context_factors, learning_rate)

        for node in category_nodes:
            if node.activation_count > 0:
                mean_activation = node.activation_sum / node.activation_count
                activation_history[node.label].append(mean_activation)
                logging.info(f"Durchschnittliche Aktivierung für Knoten {node.label}: {mean_activation:.4f}")
            else:
                activation_history[node.label].append(0.0)
                logging.info(f"Knoten {node.label} wurde in dieser Epoche nicht aktiviert.")

        if (epoch + 1) % reward_interval == 0:
            target_category = random.choice(data['Kategorie'].unique())
            reward_connections(category_nodes, target_category=target_category)

        decay_weights(category_nodes, decay_rate=decay_rate)
        social_influence(category_nodes, social_network)

    logging.info("Simulation abgeschlossen. Ergebnisse werden analysiert...")
    return activation_history, weights_history

Simulation mit mehrstufigem Gedächtnis

def simulate_multilevel_memory(data, category_nodes, personality_distributions, epochs=1):
    short_term_memory = [MemoryNode(c, "short_term") for c in category_nodes]
    mid_term_memory = []
    long_term_memory = []
    memory_nodes = short_term_memory + mid_term_memory + long_term_memory
    context_factors = {question: random.uniform(0.9, 1.1) for question in data['Frage'].unique()}
    emotional_state = 1.0
    for epoch in range(epochs):
        logging.info(f"\n--- Epoche {epoch + 1} ---")
        for node in short_term_memory:
            input_signal = random.uniform(0.1, 1.0)
            propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state)
        for memory_node in memory_nodes:
            memory_node.decay(decay_rate=0.01, context_factors=context_factors, emotional_state=emotional_state)
        for memory_node in memory_nodes:
            memory_node.promote()
        short_term_memory, mid_term_memory, long_term_memory = update_memory_stages(memory_nodes)
        logging.info(f"Epoche {epoch + 1}: Kurzzeit {len(short_term_memory)}, Mittelzeit {len(mid_term_memory)}, Langzeit {len(long_term_memory)}")
    return short_term_memory, mid_term_memory, long_term_memory

def update_memory_stages(memory_nodes):
    short_term_memory = [node for node in memory_nodes if node.memory_type == "short_term"]
    mid_term_memory = [node for node in memory_nodes if node.memory_type == "mid_term"]
    long_term_memory = [node for node in memory_nodes if node.memory_type == "long_term"]
    return short_term_memory, mid_term_memory, long_term_memory

Plot-Funktionen

def plot_activation_history(activation_history, filename="activation_history.png"):
    if not activation_history:
        logging.warning("No activation history to plot")
        return
    plt.figure(figsize=(12, 8))
    for label, activations in activation_history.items():
        if len(activations) > 0:
            plt.plot(range(1, len(activations) + 1), activations, label=label)
    plt.title("Entwicklung der Aktivierungen während des Lernens")
    plt.xlabel("Epoche")
    plt.ylabel("Aktivierung")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
    plt.close()
    logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")

def plot_dynamics(activation_history, weights_history, filename="dynamics.png"):
    if not weights_history:
        logging.error("weights_history ist leer.")
        return

    plt.figure(figsize=(16, 12))
    plt.subplot(2, 2, 1)
    for label, activations in activation_history.items():
        if len(activations) > 0:
            plt.plot(range(1, len(activations) + 1), activations, label=label)
    plt.title("Entwicklung der Aktivierungen während des Lernens")
    plt.xlabel("Epoche")
    plt.ylabel("Aktivierung")
    plt.legend()
    plt.grid(True)

    plt.subplot(2, 2, 2)
    for label, weights in weights_history.items():
        if len(weights) > 0:
            plt.plot(range(1, len(weights) + 1), weights, label=label, alpha=0.7)
    plt.title("Entwicklung der Verbindungsgewichte während des Lernens")
    plt.xlabel("Epoche")
    plt.ylabel("Gewicht")
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True)

    plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
    plt.close()
    logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")

def plot_memory_distribution(short_term_memory, mid_term_memory, long_term_memory, filename="memory_distribution.png"):
    counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)]
    labels = ["Kurzfristig", "Mittelfristig", "Langfristig"]
    plt.figure(figsize=(8, 6))
    plt.bar(labels, counts, color=["red", "blue", "green"])
    plt.title("Verteilung der Gedächtnisknoten")
    plt.ylabel("Anzahl der Knoten")
    plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
    plt.close()
    logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")

def plot_activation_heatmap(activation_history, filename="activation_heatmap.png"):
    if not activation_history:
        logging.warning("No activation history to plot")
        return

    min_length = min(len(activations) for activations in activation_history.values())
    truncated_activations = {key: values[:min_length] for key, values in activation_history.items()}

    plt.figure(figsize=(12, 8))
    heatmap_data = np.array([activations for activations in truncated_activations.values()])

    if heatmap_data.size == 0:
        logging.error("Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.")
        return

    sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False)
    plt.title("Heatmap der Aktivierungswerte")
    plt.xlabel("Kategorie")
    plt.ylabel("Epoche")
    plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
    plt.close()
    logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")

def plot_network_topology(category_nodes, new_brains, filename="network_topology.png"):
    G = nx.DiGraph()
    for node in category_nodes:
        G.add_node(node.label)
        for conn in node.connections:
            G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
    for brain in new_brains:
        G.add_node(brain.label, color='red')
        for conn in brain.connections:
            G.add_edge(brain.label, conn.target_node.label, weight=conn.weight)

    pos = nx.spring_layout(G)
    edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)}
    node_colors = [G.nodes[node].get('color', 'skyblue') for node in G.nodes()]

    nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray")
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
    plt.title("Netzwerktopologie")
    plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
    plt.close()
    logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")

Modell speichern und laden

def save_model(category_nodes, filename="model.json"):
    model_data = {
        "nodes": [node.save_state() for node in category_nodes]
    }
    with open(filename, "w") as file:
        json.dump(model_data, file, indent=4)
    logging.info(f"Modell gespeichert in {filename}")

def save_model_with_questions_and_answers(category_nodes, questions, filename="model_with_qa.json"):
    global model_saved
    logging.info("Starte Speichern des Modells...")

    # Überprüfen, ob Änderungen vorgenommen wurden
    current_model_data = {
        "nodes": [node.save_state() for node in category_nodes],
        "questions": questions
    }

    if os.path.exists(filename):
        try:
            with open(filename, "r", encoding="utf-8") as file:
                existing_model_data = json.load(file)
                if existing_model_data == current_model_data:
                    logging.info("Keine Änderungen erkannt, erneutes Speichern übersprungen.")
                    return
        except Exception as e:
            logging.warning(f"Fehler beim Überprüfen des vorhandenen Modells: {e}")

    # Speichern des aktualisierten Modells
    try:
        with open(filename, "w", encoding="utf-8") as file:
            json.dump(current_model_data, file, indent=4)
        logging.info(f"Modell erfolgreich gespeichert unter {filename}.")
        model_saved = True  # Setze auf True nach erfolgreichem Speichern
    except Exception as e:
        logging.error(f"Fehler beim Speichern des Modells: {e}")

def load_model_with_questions_and_answers(filename="model_with_qa.json"):
    global initialized
    if initialized:
        logging.info("Modell bereits initialisiert.")
        return None, None

    if not os.path.exists(filename):
        logging.warning(f"Datei {filename} nicht gefunden. Netzwerk wird initialisiert.")
        return None, None

    try:
        with open(filename, "r", encoding="utf-8") as file:
            model_data = json.load(file)

        nodes_dict = {node_data["label"]: Node(node_data["label"]) for node_data in model_data["nodes"]}

        for node_data in model_data["nodes"]:
            node = nodes_dict[node_data["label"]]
            node.activation = node_data.get("activation", 0.0)
            for conn_state in node_data["connections"]:
                target_node = nodes_dict.get(conn_state["target"])
                if target_node:
                    node.add_connection(target_node, conn_state["weight"])

        questions = model_data.get("questions", [])
        logging.info(f"Modell geladen mit {len(nodes_dict)} Knoten und {len(questions)} Fragen")
        initialized = True
        return list(nodes_dict.values()), questions

    except json.JSONDecodeError as e:
        logging.error(f"Fehler beim Parsen der JSON-Datei: {e}")
        return None, None

Fragen aktualisieren

def update_questions_with_answers(filename="model_with_qa.json"):
    with open(filename, "r") as file:
        model_data = json.load(file)

    for question in model_data["questions"]:
        if "answer" not in question:
            question["answer"] = input(f"Gib die Antwort für: '{question['question']}': ")

    with open(filename, "w") as file:
        json.dump(model_data, file, indent=4)
    logging.info(f"Fragen wurden mit Antworten aktualisiert und gespeichert in {filename}")

Beste Antwort finden

def find_best_answer(category_nodes, questions, query):
    matched_question = find_similar_question(questions, query)
    if matched_question:
        logging.info(f"Gefundene Frage: {matched_question['question']} -> Kategorie: {matched_question['category']}")
        answer = matched_question.get("answer", "Keine Antwort verfügbar")
        logging.info(f"Antwort: {answer}")
        return answer
    else:
        logging.warning("Keine passende Frage gefunden.")
        return None

Dashboard erstellen

def create_dashboard(category_nodes, activation_history, short_term_memory, mid_term_memory, long_term_memory):
    root = tk.Tk()
    root.title("Psyco Dashboard")

    # Anzeige der Aktivierungshistorie
    activation_frame = ttk.Frame(root, padding="10")
    activation_frame.pack(fill=tk.BOTH, expand=True)
    activation_label = ttk.Label(activation_frame, text="Aktivierungshistorie")
    activation_label.pack()
    if activation_history:
        for label, activations in activation_history.items():
            fig, ax = plt.subplots()
            ax.plot(range(1, len(activations) + 1), activations)
            ax.set_title(label)
            canvas = FigureCanvasTkAgg(fig, master=activation_frame)
            canvas.draw()
            canvas.get_tk_widget().pack()
    else:
        no_data_label = ttk.Label(activation_frame, text="Keine Aktivierungshistorie verfügbar.")
        no_data_label.pack()

    # Anzeige der Gedächtnisverteilung
    memory_frame = ttk.Frame(root, padding="10")
    memory_frame.pack(fill=tk.BOTH, expand=True)
    memory_label = ttk.Label(memory_frame, text="Gedächtnisverteilung")
    memory_label.pack()
    memory_counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)]
    labels = ["Kurzfristig", "Mittelfristig", "Langfristig"]
    fig, ax = plt.subplots()
    ax.bar(labels, memory_counts, color=["red", "blue", "green"])
    ax.set_title("Verteilung der Gedächtnisknoten")
    ax.set_ylabel("Anzahl der Knoten")
    canvas = FigureCanvasTkAgg(fig, master=memory_frame)
    canvas.draw()
    canvas.get_tk_widget().pack()

    # Anzeige der Netzwerktopologie
    topology_frame = ttk.Frame(root, padding="10")
    topology_frame.pack(fill=tk.BOTH, expand=True)
    topology_label = ttk.Label(topology_frame, text="Netzwerktopologie")
    topology_label.pack()
    G = nx.DiGraph()
    for node in category_nodes:
        G.add_node(node.label)
        for conn in node.connections:
            G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
    pos = nx.spring_layout(G)
    edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)}
    node_colors = ['skyblue' for _ in G.nodes()]
    fig, ax = plt.subplots()
    nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray", ax=ax)
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, ax=ax)
    ax.set_title("Netzwerktopologie")
    canvas = FigureCanvasTkAgg(fig, master=topology_frame)
    canvas.draw()
    canvas.get_tk_widget().pack()

    # Anzeige der Heatmap der Aktivierungswerte
    heatmap_frame = ttk.Frame(root, padding="10")
    heatmap_frame.pack(fill=tk.BOTH, expand=True)
    heatmap_label = ttk.Label(heatmap_frame, text="Heatmap der Aktivierungswerte")
    heatmap_label.pack()
    if activation_history:
        min_length = min(len(activations) for activations in activation_history.values())
        truncated_activations = {key: values[:min_length] for key, values in activation_history.items()}
        heatmap_data = np.array([activations for activations in truncated_activations.values()])
        if heatmap_data.size > 0:
            fig, ax = plt.subplots()
            sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False, ax=ax)
            ax.set_title("Heatmap der Aktivierungswerte")
            ax.set_xlabel("Kategorie")
            ax.set_ylabel("Epoche")
            canvas = FigureCanvasTkAgg(fig, master=heatmap_frame)
            canvas.draw()
            canvas.get_tk_widget().pack()
        else:
            no_data_label = ttk.Label(heatmap_frame, text="Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.")
            no_data_label.pack()
    else:
        no_data_label = ttk.Label(heatmap_frame, text="Keine Aktivierungshistorie verfügbar.")
        no_data_label.pack()

    root.mainloop()

CSV-Datei verarbeiten

def process_csv_in_chunks(filename, chunk_size=10000):
    global category_nodes, questions
    logging.info(f"Beginne Verarbeitung der Datei: {filename}")

    try:
        # Test, ob die Datei existiert
        if not os.path.exists(filename):
            logging.error(f"Datei {filename} nicht gefunden.")
            return None

        all_chunks = []
        for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8", on_bad_lines='skip'):
            logging.info(f"Chunk mit {len(chunk)} Zeilen gelesen.")
            if 'Frage' not in chunk.columns or 'Kategorie' not in chunk.columns or 'Antwort' not in chunk.columns:
                logging.error("CSV-Datei enthält nicht die erwarteten Spalten: 'Frage', 'Kategorie', 'Antwort'")
                return None

            all_chunks.append(chunk)

        data = pd.concat(all_chunks, ignore_index=True)
        logging.info(f"Alle Chunks erfolgreich verarbeitet. Gesamtzeilen: {len(data)}")

        return data

    except pd.errors.EmptyDataError:
        logging.error("CSV-Datei ist leer.")
    except pd.errors.ParserError as e:
        logging.error(f"Parsing-Fehler in CSV-Datei: {e}")
    except Exception as e:
        logging.error(f"Unerwarteter Fehler beim Verarbeiten der Datei: {e}")

    return None

Einzelne Einträge verarbeiten

def process_single_entry(question, category, answer):
    global category_nodes, questions

    # Sicherstellen, dass die globalen Variablen initialisiert sind
    if category_nodes is None:
        category_nodes = []
        logging.warning("Kategorie-Knotenliste war None, wurde nun initialisiert.")

    if questions is None:
        questions = []
        logging.warning("Fragenliste war None, wurde nun initialisiert.")

    # Überprüfen, ob die Kategorie bereits vorhanden ist
    if not any(node.label == category for node in category_nodes):
        category_nodes.append(Node(category))
        logging.info(f"Neue Kategorie '{category}' dem Netzwerk hinzugefügt.")

    # Frage, Kategorie und Antwort zur Liste hinzufügen
    questions.append({"question": question, "category": category, "answer": answer})
    logging.info(f"Neue Frage hinzugefügt: '{question}' -> Kategorie: '{category}'")

CSV-Datei mit Dask verarbeiten

def process_csv_with_dask(filename, chunk_size=10000):
    try:
        ddf = dd.read_csv(filename, blocksize=chunk_size)
        ddf = ddf.astype({'Kategorie': 'category'})

        for row in ddf.itertuples(index=False, name=None):
            process_single_entry(row[0], row[1], row[2])

        logging.info("Alle Chunks erfolgreich mit Dask verarbeitet.")
    except Exception as e:
        logging.error(f"Fehler beim Verarbeiten der Datei mit Dask: {e}")

In SQLite speichern

def save_to_sqlite(filename, db_name="dataset.db"):
    conn = sqlite3.connect(db_name)
    chunk_iter = pd.read_csv(filename, chunksize=10000)
    for chunk in chunk_iter:
        chunk.to_sql("qa_data", conn, if_exists="append", index=False)
        logging.info(f"Chunk mit {len(chunk)} Zeilen gespeichert.")
    conn.close()
    logging.info("CSV-Daten wurden erfolgreich in SQLite gespeichert.")

Aus SQLite laden

def load_from_sqlite(db_name="dataset.db"):
    conn = sqlite3.connect(db_name)
    query = "SELECT Frage, Kategorie, Antwort FROM qa_data"
    data = pd.read_sql_query(query, conn)
    conn.close()
    return data

Teilmodell speichern

def save_partial_model(filename="partial_model.json"):
    model_data = {
        "nodes": [node.save_state() for node in category_nodes],
        "questions": questions
    }
    with open(filename, "w") as file:
        json.dump(model_data, file, indent=4)
    logging.info("Teilmodell gespeichert.")

CSV-Datei faul laden

def lazy_load_csv(filename, chunk_size=10000):
    for chunk in pd.read_csv(filename, chunksize=chunk_size):
        for _, row in chunk.iterrows():
            yield row['Frage'], row['Kategorie'], row['Antwort']

Hauptfunktion

def main():
    start_time = time.time()
    category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")

    if category_nodes is None:
        csv_file = "data.csv"
        data = process_csv_in_chunks(csv_file)
        if data is None:
            logging.error("Fehler beim Laden der CSV-Datei.")
            return

        if len(data) > 1000:
            logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
            split_csv(csv_file)

            # Verarbeite jede aufgeteilte Datei
            data_dir = "data"
            for filename in os.listdir(data_dir):
                if filename.endswith(".csv"):
                    file_path = os.path.join(data_dir, filename)
                    logging.info(f"Verarbeite Datei: {file_path}")

                    data = process_csv_in_chunks(file_path)
                    if data is None:
                        logging.error("Fehler beim Laden der CSV-Datei.")
                        return

                    categories = data['Kategorie'].unique()
                    category_nodes = initialize_quiz_network(categories)
                    questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]

                    personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
                    activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions)

                    save_model_with_questions_and_answers(category_nodes, questions)
        else:
            logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
            categories = data['Kategorie'].unique()
            category_nodes = initialize_quiz_network(categories)
            questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]

            personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
            activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions)

            save_model_with_questions_and_answers(category_nodes, questions)

    end_time = time.time()
    logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden")

Simulation aus der GUI starten

def run_simulation_from_gui(learning_rate, decay_rate, reward_interval, epochs):
    global model_saved
    model_saved = False  # Erzwinge das Speichern nach dem Training

    start_time = time.time()
    csv_file = "data.csv"

    category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")

    if category_nodes is None:
        data = process_csv_in_chunks(csv_file)
        if not isinstance(data, pd.DataFrame):
            logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
            return

        if len(data) > 1000:
            logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
            split_csv(csv_file)

            # Verarbeite jede aufgeteilte Datei
            data_dir = "data"
            for filename in os.listdir(data_dir):
                if filename.endswith(".csv"):
                    file_path = os.path.join(data_dir, filename)
                    logging.info(f"Verarbeite Datei: {file_path}")

                    data = process_csv_in_chunks(file_path)
                    if not isinstance(data, pd.DataFrame):
                        logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
                        return

                    categories = data['Kategorie'].unique()
                    category_nodes = initialize_quiz_network(categories)
                    questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]

                    personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
                    activation_history, weights_history = simulate_learning(
                        data, category_nodes, personality_distributions,
                        epochs=int(epochs),
                        learning_rate=float(learning_rate),
                        reward_interval=int(reward_interval),
                        decay_rate=float(decay_rate)
                    )

                    save_model_with_questions_and_answers(category_nodes, questions)
        else:
            logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
            categories = data['Kategorie'].unique()
            category_nodes = initialize_quiz_network(categories)
            questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]

            personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
            activation_history, weights_history = simulate_learning(
                data, category_nodes, personality_distributions,
                epochs=int(epochs),
                learning_rate=float(learning_rate),
                reward_interval=int(reward_interval),
                decay_rate=float(decay_rate)
            )

            save_model_with_questions_and_answers(category_nodes, questions)
    else:
        data = process_csv_in_chunks(csv_file)
        if not isinstance(data, pd.DataFrame):
            logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
            return

        logging.info(f"Anzahl der Zeilen in der geladenen CSV: {len(data)}")

        personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}

        activation_history, weights_history = simulate_learning(
            data, category_nodes, personality_distributions,
            epochs=int(epochs),
            learning_rate=float(learning_rate),
            reward_interval=int(reward_interval),
            decay_rate=float(decay_rate)
        )

        save_model_with_questions_and_answers(category_nodes, questions)

    end_time = time.time()
    logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden")
    messagebox.showinfo("Ergebnis", f"Simulation abgeschlossen! Dauer: {end_time - start_time:.2f} Sekunden")

Netzwerk asynchron initialisieren

def async_initialize_network():
    global category_nodes, questions, model_saved
    logging.info("Starte Initialisierung des Netzwerks...")

    category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")

    if category_nodes is None:
        category_nodes = []
        logging.warning("Keine gespeicherten Kategorien gefunden. Neues Netzwerk wird erstellt.")
        model_saved = False  # Zurücksetzen der Speicher-Flagge

    if questions is None:
        questions = []
        logging.warning("Keine gespeicherten Fragen gefunden. Neues Fragen-Array wird erstellt.")
        model_saved = False  # Zurücksetzen der Speicher-Flagge

    if not category_nodes:
        csv_file = "data.csv"
        data = process_csv_in_chunks(csv_file)
        if isinstance(data, pd.DataFrame):
            if len(data) > 1000:
                logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
                split_csv(csv_file)

                # Verarbeite jede aufgeteilte Datei
                data_dir = "data"
                for filename in os.listdir(data_dir):
                    if filename.endswith(".csv"):
                        file_path = os.path.join(data_dir, filename)
                        logging.info(f"Verarbeite Datei: {file_path}")

                        data = process_csv_in_chunks(file_path)
                        if isinstance(data, pd.DataFrame):
                            categories = data['Kategorie'].unique()
                            category_nodes = initialize_quiz_network(categories)
                            questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
                            logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.")
                            model_saved = False  # Zurücksetzen der Speicher-Flagge
            else:
                logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
                categories = data['Kategorie'].unique()
                category_nodes = initialize_quiz_network(categories)
                questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
                logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.")
                model_saved = False  # Zurücksetzen der Speicher-Flagge
        else:
            logging.error("Fehler beim Laden der CSV-Daten. Netzwerk konnte nicht initialisiert werden.")
            return

    save_model_with_questions_and_answers(category_nodes, questions)
    logging.info("Netzwerk erfolgreich initialisiert.")

GUI starten

def start_gui():
    def start_simulation():
        try:
            threading.Thread(target=run_simulation_from_gui, args=(0.8, 0.002, 5, 10), daemon=True).start()
            messagebox.showinfo("Info", "Simulation gestartet!")
            logging.info("Simulation gestartet")
        except Exception as e:
            logging.error(f"Fehler beim Start der Simulation: {e}")
            messagebox.showerror("Fehler", f"Fehler: {e}")

    root = tk.Tk()
    root.title("DRLCogNet GUI")
    root.geometry("400x300")

    header_label = tk.Label(root, text="Simulationseinstellungen", font=("Helvetica", 16))
    header_label.pack(pady=10)

    start_button = tk.Button(root, text="Simulation starten", command=start_simulation)
    start_button.pack(pady=20)

    root.mainloop()

Hauptprogramm

if __name__ == "__main__":
    # Starte die Initialisierung in einem Thread
    threading.Thread(target=async_initialize_network, daemon=True).start()
    start_gui()

Fragen zur Datenbank (SQLite)

Wird die Datenbank im Arbeitsspeicher erstellt?

Ja, die SQLite-Datenbank wird im Arbeitsspeicher erstellt, wenn die Funktion save_to_sqlite aufgerufen wird. Diese Funktion erstellt eine SQLite-Datenbankdatei (standardmäßig dataset.db), die im Arbeitsspeicher gespeichert wird, wenn Sie sie nicht an einem anderen Ort speichern.

Wie wird die Datenbank erstellt?

Die Datenbank wird erstellt, indem eine Verbindung zur SQLite-Datenbank hergestellt wird. Wenn die Datei dataset.db nicht existiert, wird sie erstellt. Anschließend werden die Daten aus der CSV-Datei in Chunks gelesen und in die Tabelle qa_data der SQLite-Datenbank gespeichert.

Wie werden die Daten in die Datenbank geladen?

Die Daten werden in Chunks aus der CSV-Datei gelesen und in die Tabelle qa_data der SQLite-Datenbank gespeichert. Die Funktion to_sql von Pandas wird verwendet, um die Daten in die Datenbank zu schreiben.

Wie werden die Daten aus der Datenbank geladen?

Die Daten werden aus der Datenbank geladen, indem eine Verbindung zur SQLite-Datenbank hergestellt und eine SQL-Abfrage ausgeführt wird, um die Daten aus der Tabelle qa_data zu lesen. Die Funktion read_sql_query von Pandas wird verwendet, um die Daten in einen Pandas-DataFrame zu laden.

Beispielcode zur Verwendung der Datenbank

# Daten in die Datenbank speichern
save_to_sqlite("data.csv")

# Daten aus der Datenbank laden
data = load_from_sqlite()

Fazit

Diese Dokumentation bietet eine umfassende Übersicht über den Code und die Verwendung der SQLite-Datenbank zur Speicherung und zum Laden von Daten. Der Code ist modular aufgebaut und ermöglicht die Verarbeitung und Simulation von Daten aus einer CSV-Datei in einem neuronalen Netzwerk. Die SQLite-Datenbank wird im Arbeitsspeicher erstellt und ermöglicht die effiziente Speicherung und das Laden von Daten.