Spaces:
Running
Running
| # src/ontology_manager.py | |
| import json | |
| import networkx as nx | |
| from typing import Dict, List, Any, Optional, Union, Set | |
| class OntologyManager: | |
| """ | |
| Manages the ontology model and provides methods for querying and navigating | |
| the ontological structure. | |
| """ | |
| def __init__(self, ontology_path: str): | |
| """ | |
| Initialize the ontology manager with a path to the ontology JSON file. | |
| Args: | |
| ontology_path: Path to the JSON file containing the ontology model | |
| """ | |
| self.ontology_path = ontology_path | |
| self.ontology_data = self._load_ontology() | |
| self.graph = nx.MultiDiGraph() | |
| self._build_graph() | |
| def _load_ontology(self) -> Dict: | |
| """Load the ontology from the JSON file.""" | |
| with open(self.ontology_path, 'r') as f: | |
| return json.load(f) | |
| def _build_graph(self): | |
| """Build the ontology graph from the JSON data.""" | |
| # Add classes | |
| for class_id, class_data in self.ontology_data["classes"].items(): | |
| self.graph.add_node( | |
| class_id, | |
| type="class", | |
| description=class_data.get("description", ""), | |
| properties=class_data.get("properties", []) | |
| ) | |
| # Handle subclass relations | |
| if "subClassOf" in class_data: | |
| parent = class_data["subClassOf"] | |
| self.graph.add_edge(class_id, parent, type="subClassOf") | |
| # Add relationships (schema-level only, no edge added yet) | |
| for rel in self.ontology_data.get("relationships", []): | |
| pass # schema relationships are used for metadata, not edges | |
| # Add instances | |
| for instance in self.ontology_data.get("instances", []): | |
| instance_id = instance["id"] | |
| class_type = instance["type"] | |
| properties = instance.get("properties", {}) | |
| # Add the instance node | |
| self.graph.add_node( | |
| instance_id, | |
| type="instance", | |
| class_type=class_type, | |
| properties=properties | |
| ) | |
| # Link instance to its class | |
| self.graph.add_edge(instance_id, class_type, type="instanceOf") | |
| # Add relationship edges if any | |
| for rel in instance.get("relationships", []): | |
| target = rel.get("target") | |
| rel_type = rel.get("type") | |
| if target and rel_type: | |
| self.graph.add_edge(instance_id, target, type=rel_type) | |
| def get_classes(self) -> List[str]: | |
| """Return a list of all class names in the ontology.""" | |
| return list(self.ontology_data["classes"].keys()) | |
| def get_class_hierarchy(self) -> Dict[str, List[str]]: | |
| """Return a dictionary mapping each class to its subclasses.""" | |
| hierarchy = {} | |
| for class_id in self.get_classes(): | |
| hierarchy[class_id] = [] | |
| for class_id, class_data in self.ontology_data["classes"].items(): | |
| if "subClassOf" in class_data: | |
| parent = class_data["subClassOf"] | |
| if parent in hierarchy: | |
| hierarchy[parent].append(class_id) | |
| return hierarchy | |
| def get_instances_of_class(self, class_name: str, include_subclasses: bool = True) -> List[str]: | |
| """ | |
| Get all instances of a given class. | |
| Args: | |
| class_name: The name of the class | |
| include_subclasses: Whether to include instances of subclasses | |
| Returns: | |
| A list of instance IDs | |
| """ | |
| if include_subclasses: | |
| # Get all subclasses recursively | |
| subclasses = set(self._get_all_subclasses(class_name)) | |
| subclasses.add(class_name) | |
| # Get instances of all classes | |
| instances = [] | |
| for class_id in subclasses: | |
| instances.extend([ | |
| n for n, attr in self.graph.nodes(data=True) | |
| if attr.get("type") == "instance" and attr.get("class_type") == class_id | |
| ]) | |
| return instances | |
| else: | |
| # Just get direct instances | |
| return [ | |
| n for n, attr in self.graph.nodes(data=True) | |
| if attr.get("type") == "instance" and attr.get("class_type") == class_name | |
| ] | |
| def _get_all_subclasses(self, class_name: str) -> List[str]: | |
| """Recursively get all subclasses of a given class.""" | |
| subclasses = [] | |
| direct_subclasses = [ | |
| src for src, dst, data in self.graph.edges(data=True) | |
| if dst == class_name and data.get("type") == "subClassOf" | |
| ] | |
| for subclass in direct_subclasses: | |
| subclasses.append(subclass) | |
| subclasses.extend(self._get_all_subclasses(subclass)) | |
| return subclasses | |
| def get_relationships(self, entity_id: str, relationship_type: Optional[str] = None) -> List[Dict]: | |
| """ | |
| Get all relationships for a given entity, optionally filtered by type. | |
| Args: | |
| entity_id: The ID of the entity | |
| relationship_type: Optional relationship type to filter by | |
| Returns: | |
| A list of dictionaries containing relationship information | |
| """ | |
| relationships = [] | |
| # Look at outgoing edges | |
| for _, target, data in self.graph.out_edges(entity_id, data=True): | |
| rel_type = data.get("type") | |
| if rel_type != "instanceOf" and rel_type != "subClassOf": | |
| if relationship_type is None or rel_type == relationship_type: | |
| relationships.append({ | |
| "type": rel_type, | |
| "target": target, | |
| "direction": "outgoing" | |
| }) | |
| # Look at incoming edges | |
| for source, _, data in self.graph.in_edges(entity_id, data=True): | |
| rel_type = data.get("type") | |
| if rel_type != "instanceOf" and rel_type != "subClassOf": | |
| if relationship_type is None or rel_type == relationship_type: | |
| relationships.append({ | |
| "type": rel_type, | |
| "source": source, | |
| "direction": "incoming" | |
| }) | |
| return relationships | |
| def find_paths(self, source_id: str, target_id: str, max_length: int = 3) -> List[List[Dict]]: | |
| """ | |
| Find all paths between two entities up to a maximum length. | |
| Args: | |
| source_id: Starting entity ID | |
| target_id: Target entity ID | |
| max_length: Maximum path length | |
| Returns: | |
| A list of paths, where each path is a list of relationship dictionaries | |
| """ | |
| paths = [] | |
| # Use networkx to find simple paths | |
| simple_paths = nx.all_simple_paths(self.graph, source_id, target_id, cutoff=max_length) | |
| for path in simple_paths: | |
| path_with_edges = [] | |
| for i in range(len(path) - 1): | |
| source = path[i] | |
| target = path[i + 1] | |
| # There may be multiple edges between nodes | |
| edges = self.graph.get_edge_data(source, target) | |
| if edges: | |
| for key, data in edges.items(): | |
| path_with_edges.append({ | |
| "source": source, | |
| "target": target, | |
| "type": data.get("type", "unknown") | |
| }) | |
| paths.append(path_with_edges) | |
| return paths | |
| def get_entity_info(self, entity_id: str) -> Dict: | |
| """ | |
| Get detailed information about an entity. | |
| Args: | |
| entity_id: The ID of the entity | |
| Returns: | |
| A dictionary with entity information | |
| """ | |
| if entity_id not in self.graph: | |
| return {} | |
| node_data = self.graph.nodes[entity_id] | |
| entity_type = node_data.get("type") | |
| if entity_type == "instance": | |
| # Get class information | |
| class_type = node_data.get("class_type") | |
| class_info = self.ontology_data["classes"].get(class_type, {}) | |
| return { | |
| "id": entity_id, | |
| "type": entity_type, | |
| "class": class_type, | |
| "class_description": class_info.get("description", ""), | |
| "properties": node_data.get("properties", {}), | |
| "relationships": self.get_relationships(entity_id) | |
| } | |
| elif entity_type == "class": | |
| return { | |
| "id": entity_id, | |
| "type": entity_type, | |
| "description": node_data.get("description", ""), | |
| "properties": node_data.get("properties", []), | |
| "subclasses": self._get_all_subclasses(entity_id), | |
| "instances": self.get_instances_of_class(entity_id) | |
| } | |
| return node_data | |
| def get_text_representation(self) -> str: | |
| """ | |
| Generate a text representation of the ontology for embedding. | |
| Returns: | |
| A string containing the textual representation of the ontology | |
| """ | |
| text_chunks = [] | |
| # Class definitions | |
| for class_id, class_data in self.ontology_data["classes"].items(): | |
| chunk = f"Class: {class_id}\n" | |
| chunk += f"Description: {class_data.get('description', '')}\n" | |
| if "subClassOf" in class_data: | |
| chunk += f"{class_id} is a subclass of {class_data['subClassOf']}.\n" | |
| if "properties" in class_data: | |
| chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n" | |
| text_chunks.append(chunk) | |
| # Relationship definitions | |
| for rel in self.ontology_data["relationships"]: | |
| chunk = f"Relationship: {rel['name']}\n" | |
| chunk += f"Domain: {rel['domain']}, Range: {rel['range']}\n" | |
| chunk += f"Description: {rel.get('description', '')}\n" | |
| chunk += f"Cardinality: {rel.get('cardinality', 'many-to-many')}\n" | |
| if "inverse" in rel: | |
| chunk += f"The inverse relationship is {rel['inverse']}.\n" | |
| text_chunks.append(chunk) | |
| # Rules | |
| for rule in self.ontology_data.get("rules", []): | |
| chunk = f"Rule: {rule.get('id', '')}\n" | |
| chunk += f"Description: {rule.get('description', '')}\n" | |
| text_chunks.append(chunk) | |
| # Instance data | |
| for instance in self.ontology_data["instances"]: | |
| chunk = f"Instance: {instance['id']}\n" | |
| chunk += f"Type: {instance['type']}\n" | |
| # Properties | |
| if "properties" in instance: | |
| props = [] | |
| for key, value in instance["properties"].items(): | |
| if isinstance(value, list): | |
| props.append(f"{key}: {', '.join(str(v) for v in value)}") | |
| else: | |
| props.append(f"{key}: {value}") | |
| if props: | |
| chunk += "Properties:\n- " + "\n- ".join(props) + "\n" | |
| # Relationships | |
| if "relationships" in instance: | |
| rels = [] | |
| for rel in instance["relationships"]: | |
| rels.append(f"{rel['type']} {rel['target']}") | |
| if rels: | |
| chunk += "Relationships:\n- " + "\n- ".join(rels) + "\n" | |
| text_chunks.append(chunk) | |
| return "\n\n".join(text_chunks) | |
| def query_by_relationship(self, source_type: str, relationship: str, target_type: str) -> List[Dict]: | |
| """ | |
| Query for instances connected by a specific relationship. | |
| Args: | |
| source_type: Type of the source entity | |
| relationship: Type of relationship | |
| target_type: Type of the target entity | |
| Returns: | |
| A list of matching relationship dictionaries | |
| """ | |
| results = [] | |
| # Get all instances of the source type | |
| source_instances = self.get_instances_of_class(source_type) | |
| for source_id in source_instances: | |
| # Get relationships of the specified type | |
| relationships = self.get_relationships(source_id, relationship) | |
| for rel in relationships: | |
| if rel["direction"] == "outgoing" and "target" in rel: | |
| target_id = rel["target"] | |
| target_data = self.graph.nodes[target_id] | |
| # Check if the target is of the right type | |
| if (target_data.get("type") == "instance" and | |
| target_data.get("class_type") == target_type): | |
| results.append({ | |
| "source": source_id, | |
| "source_properties": self.graph.nodes[source_id].get("properties", {}), | |
| "relationship": relationship, | |
| "target": target_id, | |
| "target_properties": target_data.get("properties", {}) | |
| }) | |
| return results | |
| def get_semantic_context(self, query: str) -> List[str]: | |
| """ | |
| Retrieve relevant semantic context from the ontology based on a query. | |
| This method identifies entities and relationships mentioned in the query | |
| and returns contextual information about them from the ontology. | |
| Args: | |
| query: The query string to analyze | |
| Returns: | |
| A list of text chunks providing relevant ontological context | |
| """ | |
| # This is a simple implementation - a more sophisticated one would use | |
| # entity recognition and semantic parsing | |
| query_lower = query.lower() | |
| context_chunks = [] | |
| # Check for class mentions | |
| for class_id in self.get_classes(): | |
| if class_id.lower() in query_lower: | |
| # Add class information | |
| class_data = self.ontology_data["classes"][class_id] | |
| chunk = f"Class {class_id}: {class_data.get('description', '')}\n" | |
| # Add subclass information | |
| if "subClassOf" in class_data: | |
| parent = class_data["subClassOf"] | |
| chunk += f"{class_id} is a subclass of {parent}.\n" | |
| # Add property information | |
| if "properties" in class_data: | |
| chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n" | |
| context_chunks.append(chunk) | |
| # Also add some instance examples | |
| instances = self.get_instances_of_class(class_id, include_subclasses=False)[:3] | |
| if instances: | |
| instance_chunk = f"Examples of {class_id}:\n" | |
| for inst_id in instances: | |
| props = self.graph.nodes[inst_id].get("properties", {}) | |
| if "name" in props: | |
| instance_chunk += f"- {inst_id} ({props['name']})\n" | |
| else: | |
| instance_chunk += f"- {inst_id}\n" | |
| context_chunks.append(instance_chunk) | |
| # Check for relationship mentions | |
| for rel in self.ontology_data["relationships"]: | |
| if rel["name"].lower() in query_lower: | |
| chunk = f"Relationship {rel['name']}: {rel.get('description', '')}\n" | |
| chunk += f"This relationship connects {rel['domain']} to {rel['range']}.\n" | |
| # Add examples | |
| examples = self.query_by_relationship(rel['domain'], rel['name'], rel['range'])[:3] | |
| if examples: | |
| chunk += "Examples:\n" | |
| for ex in examples: | |
| source_props = ex["source_properties"] | |
| target_props = ex["target_properties"] | |
| source_name = source_props.get("name", ex["source"]) | |
| target_name = target_props.get("name", ex["target"]) | |
| chunk += f"- {source_name} {rel['name']} {target_name}\n" | |
| context_chunks.append(chunk) | |
| # If we found nothing specific, add general ontology info | |
| if not context_chunks: | |
| # Add information about top-level classes | |
| top_classes = [c for c, data in self.ontology_data["classes"].items() | |
| if "subClassOf" not in data or data["subClassOf"] == "Entity"] | |
| if top_classes: | |
| chunk = "Main classes in the ontology:\n" | |
| for cls in top_classes: | |
| desc = self.ontology_data["classes"][cls].get("description", "") | |
| chunk += f"- {cls}: {desc}\n" | |
| context_chunks.append(chunk) | |
| # Add information about key relationships | |
| if self.ontology_data["relationships"]: | |
| chunk = "Key relationships in the ontology:\n" | |
| for rel in self.ontology_data["relationships"][:5]: # Top 5 relationships | |
| chunk += f"- {rel['name']}: {rel.get('description', '')}\n" | |
| context_chunks.append(chunk) | |
| return context_chunks |