HMP / agents /mcp_server.py
GitHub Action
Sync from GitHub with Git LFS
bb410e4
# agents/mcp_server.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime, UTC
from tools.storage import Storage
app = FastAPI(title="HMP MCP-Agent API", version="0.2")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
db = Storage()
# ======== MODELS ========
class NoteInput(BaseModel):
text: str
tags: Optional[List[str]] = []
class NoteOutput(BaseModel):
id: int
text: str
tags: List[str]
source: str
read: int
timestamp: str
class DiaryInput(BaseModel):
text: str
tags: Optional[List[str]] = []
class DiaryOutput(BaseModel):
id: int
text: str
tags: List[str]
timestamp: str
class DiaryListOutput(BaseModel):
entries: List[DiaryOutput]
class ConceptInput(BaseModel):
name: str
description: Optional[str] = ""
class ConceptOutput(BaseModel):
concept_id: int
class Concept(BaseModel):
concept_id: int
name: str
description: Optional[str]
class LinkInput(BaseModel):
source_id: int
target_id: int
relation: str
class LinkOutput(BaseModel):
link_id: int
class Edge(BaseModel):
source_id: int
target_id: int
relation: str
class GraphExpansionOutput(BaseModel):
links: List[Edge]
class GraphExport(BaseModel):
nodes: List[Concept]
edges: List[Edge]
class GraphImportData(BaseModel):
nodes: List[Concept]
edges: List[Edge]
class ConceptUpdate(BaseModel):
name: Optional[str]
description: Optional[str]
class ConceptQueryOutput(BaseModel):
matches: List[Concept]
class NoteTagUpdate(BaseModel):
id: int
tags: List[str] = []
# ======== ROUTES ========
@app.get("/status")
def status():
return {"status": "ok", "timestamp": datetime.now(UTC).isoformat()}
@app.post("/diary/write", response_model=dict)
def write_diary(entry: DiaryInput):
db.write_diary_entry(entry.text, entry.tags)
return {"result": "entry saved"}
@app.get("/diary/read", response_model=DiaryListOutput)
def read_diary(limit: int = 5, tag: Optional[str] = None):
raw = db.read_diary_entries(limit=limit, tag_filter=tag)
return {
"entries": [
{
"id": r[0],
"text": r[1],
"tags": r[2].split(",") if r[2] else [],
"timestamp": r[3]
} for r in raw
]
}
@app.delete("/diary/delete/{entry_id}")
def delete_diary(entry_id: int):
db.delete_diary_entry_by_id(entry_id)
return {"result": f"entry {entry_id} deleted"}
@app.get("/diary/get_entry/{entry_id}", response_model=DiaryOutput)
def get_diary_entry(entry_id: int):
row = db.get_diary_entry(entry_id)
if row:
return {
"id": row[0],
"text": row[1],
"tags": row[2].split(",") if row[2] else [],
"timestamp": row[3]
}
raise HTTPException(status_code=404, detail="Entry not found")
@app.post("/diary/search_entries", response_model=DiaryListOutput)
def search_entries(query: str):
rows = db.search_diary_entries(query)
return {
"entries": [
{
"id": r[0],
"text": r[1],
"tags": r[2].split(",") if r[2] else [],
"timestamp": r[3]
}
for r in rows
]
}
@app.get("/diary/tag_stats", response_model=dict)
def tag_stats():
return db.get_tag_stats()
@app.get("/diary/export", response_model=DiaryListOutput)
def export_diary():
rows = db.export_diary()
return {
"entries": [
{
"id": r[0],
"text": r[1],
"tags": r[2].split(",") if r[2] else [],
"timestamp": r[3]
}
for r in rows
]
}
@app.post("/graph/add_concept", response_model=ConceptOutput)
def add_concept(concept: ConceptInput):
cid = db.add_concept(concept.name, concept.description)
return {"concept_id": cid}
@app.post("/graph/add_link", response_model=LinkOutput)
def add_link(link: LinkInput):
link_id = db.add_link(link.source_id, link.target_id, link.relation)
return {"link_id": link_id}
@app.get("/graph/expand", response_model=GraphExpansionOutput)
def expand_graph(start_id: int, depth: int = 1):
links = db.expand_concept_graph(start_id, depth)
return {"links": links}
@app.get("/graph/list_concepts", response_model=List[Concept])
def list_concepts():
rows = db.list_concepts()
return [
{"concept_id": r[0], "name": r[1], "description": r[2]}
for r in rows
]
@app.get("/graph/list_links", response_model=List[Edge])
def list_links():
return db.list_links()
@app.get("/graph/get_concept/{id}", response_model=Concept)
def get_concept(id: int):
concept = db.get_concept(id)
if concept:
return {"concept_id": concept[0], "name": concept[1], "description": concept[2]}
raise HTTPException(status_code=404, detail="Concept not found")
@app.delete("/graph/delete_concept/{id}")
def delete_concept(id: int):
db.delete_concept(id)
return {"result": f"concept {id} deleted"}
@app.delete("/graph/delete_link/{id}")
def delete_link(id: int):
db.delete_link(id)
return {"result": f"link {id} deleted"}
@app.put("/graph/update_concept/{id}")
def update_concept(id: int, update: ConceptUpdate):
db.update_concept(id, update.name, update.description)
return {"result": f"concept {id} updated"}
@app.post("/graph/merge_concepts")
def merge_concepts(source_id: int, target_id: int):
db.merge_concepts(source_id, target_id)
return {"result": f"concept {source_id} merged into {target_id}"}
@app.get("/graph/search_links", response_model=List[Edge])
def search_links(relation: str):
return db.search_links_by_relation(relation)
@app.get("/graph/search_concepts", response_model=List[Concept])
def search_concepts(query: str):
rows = db.search_concepts(query)
return [{"concept_id": r[0], "name": r[1], "description": r[2]} for r in rows]
@app.get("/graph/query_concept", response_model=ConceptQueryOutput)
def query_concept(name: str):
rows = db.query_concept(name)
return {
"matches": [
{"concept_id": r[0], "name": r[1], "description": r[2]}
for r in rows
]
}
@app.post("/graph/relate_concepts", response_model=LinkOutput)
def relate_concepts(source_name: str, target_name: str, relation: str):
sid = db.get_concept_id_by_name(source_name)
tid = db.get_concept_id_by_name(target_name)
if sid is None or tid is None:
raise HTTPException(status_code=404, detail="Concept not found")
link_id = db.add_link(sid, tid, relation)
return {"link_id": link_id}
@app.get("/graph/export", response_model=GraphExport)
def export_graph():
return db.export_graph()
@app.post("/graph/import")
def import_graph(graph_data: GraphImportData):
db.import_graph(graph_data)
return {"status": "ok"}
@app.post("/note/write", response_model=dict)
def write_note(note: NoteInput):
db.write_note(note.text, note.tags)
return {"result": "note saved"}
@app.get("/note/next", response_model=Optional[NoteOutput])
def get_next_note():
note = db.get_first_unread_note()
if note:
note_id, text, tags, source, read, timestamp = note
return {
"id": note_id,
"text": text,
"tags": tags.split(",") if tags else [],
"source": source,
"read": read,
"timestamp": timestamp
}
return None
@app.post("/note/mark_read", response_model=dict)
def mark_note_read(data: NoteTagUpdate):
db.mark_note_as_read(data.id)
return {"result": "ok"}
@app.post("/note/set_tags", response_model=dict)
def set_note_tags(data: NoteTagUpdate):
db.set_tags(data.id, data.tags)
return {"result": "ok"}
@app.get("/note/random", response_model=Optional[NoteOutput])
def get_random_note_by_tags(tags: Optional[List[str]] = None):
note = db.get_random_note_by_tags(tags or [])
if note:
note_id, text, note_tags, source, read, timestamp = note
return {
"id": note_id,
"text": text,
"tags": note_tags.split(",") if note_tags else [],
"source": source,
"read": read,
"timestamp": timestamp
}
return None
@app.get("/note/by_tag", response_model=List[NoteOutput])
def get_notes_by_tag(tag: str):
all_notes = db.read_notes(limit=1000)
return [
{
"id": note[0],
"text": note[1],
"tags": note[2].split(",") if note[2] else [],
"source": note[3],
"read": note[4],
"timestamp": note[5]
}
for note in all_notes if tag in (note[2].split(",") if note[2] else [])
]
@app.on_event("shutdown")
def shutdown():
db.close()