import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import spacy import tiktoken from lemminflect import getLemma import re from llm_optimizer import ( optimize_with_llm, optimize_with_agent, get_accurate_token_count, PERSONAS ) #from fastmcp import FastMCP #from mcp de claude from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp.prompts import base from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from pydantic import Field # ============================================================================ # spaCy Optimizer (kept separate as it's unique to this implementation) # ============================================================================ class AdvancedPromptOptimizer: def __init__(self): self.nlp = spacy.load("en_core_web_sm") self.nlp.Defaults.stop_words -= {"not", "no", "never"} self.tokenizer = tiktoken.get_encoding("cl100k_base") self.negation_words = {"not", "no", "never", "without", "except"} def _mask_spans(self, s): masks = {} # triple backticks s, n = re.subn(r"```.*?```", lambda m: masks.setdefault(f"", m.group(0)) or list(masks.keys())[-1], s, flags=re.S) # inline code s = re.sub(r"`[^`]+`", lambda m: masks.setdefault(f"", m.group(0)) or list(masks.keys())[-1], s) # urls s = re.sub(r"https?://\S+", lambda m: masks.setdefault(f"", m.group(0)) or list(masks.keys())[-1], s) # comparators s = re.sub(r"\b(less than|at least|no more than)\b", lambda m: masks.setdefault(f"", m.group(0)) or list(masks.keys())[-1], s, flags=re.I) return s, masks def _unmask_spans(self, s, masks): for k, v in masks.items(): s = s.replace(k, v) return s def optimize(self, prompt: str, aggressiveness: float = 0.7) -> tuple: """Optimize prompt with token counting""" masked_prompt, masks = self._mask_spans(prompt) optimized = self._apply_rules(masked_prompt, aggressiveness) optimized = self._linguistic_optimize(optimized, aggressiveness) optimized = self._unmask_spans(optimized, masks) optimized = re.sub(r"\s+", " ", optimized).strip() try: orig_tokens = len(self.tokenizer.encode(prompt)) new_tokens = len(self.tokenizer.encode(optimized)) except: orig_tokens = len(prompt.split()) new_tokens = len(optimized.split()) return optimized, orig_tokens, new_tokens def _apply_rules(self, text: str, aggressiveness: float) -> str: rules = [ (r"\s{2,}", " ", 0.0), (r"\b(\w+)\s+\1\b", r"\1", 0.0), (r"\b(advantages and disadvantages)\b", "pros/cons", 0.5), (r"\b(in a detailed manner|in a detailed way)\b", "", 0.7), (r"\b(I want to|I need to|I would like to)\b", "", 0.7), (r"\b(for example|e\.g\.|such as|i\.e\.)\b", "e.g.", 0.8), (r"\b(please\s+)?(kindly\s+)?(carefully|very|extremely|really|quite)\b", "", 0.8), (r"\b(can you|could you|would you)\b", "", 0.9), (r"\b(output|provide|give|return)\s+in\s+(JSON|json)\s+format\b", "JSON:", 1.0), ] for pattern, repl, priority in rules: if aggressiveness >= priority: text = re.sub(pattern, repl, text, flags=re.IGNORECASE) return text def _linguistic_optimize(self, text: str, aggressiveness: float) -> str: if not text.strip(): return text doc = self.nlp(text) out = [] for token in doc: if token.text.lower() in ["deliverables:", "constraints:", "metrics:"] and token.is_sent_start: out.append(token.text) continue if token.pos_ in ("PUNCT", "SPACE"): continue if token.like_num or token.ent_type_ or token.dep_ == "neg" or token.text.lower() in self.negation_words: out.append(token.text) continue if token.pos_ in ("PROPN", "NUM", "NOUN", "ADJ"): out.append(token.text) continue if token.pos_ == "VERB": if aggressiveness >= 0.8: lemma = getLemma(token.text, upos="VERB") or [token.lemma_] out.append(lemma[0]) else: out.append(token.text) continue if token.pos_ in ("ADV", "DET", "PRON"): if aggressiveness < 0.6: out.append(token.text) continue out.append(token.text) return " ".join(out) # ============================================================================ # MCP Server # ============================================================================ mcp = FastMCP(name="PromptOptimizer", log_level="ERROR") @mcp.tool( name="optimize_prompt", description="Optimizes a given prompt using various methods to reduce token count while preserving meaning.", ) def optimize_prompt( prompt: str = Field(description="The prompt to optimize"), method: str = Field(default="simple", description="The optimization method to use. Can be 'simple', 'agent', or 'spacy'"), persona: str = Field(default="Default", description="The persona to use for LLM-based optimization"), api_key: str = Field(default=None, description="The API key for the LLM"), tavily_api_key: str = Field(default=None, description="The API key for Tavily search"), aggressiveness: float = Field(default=0.7, description="The aggressiveness level for spaCy-based optimization"), ) -> str: if method == "simple": if not api_key: return "API key is required for simple optimization." return optimize_with_llm(prompt, api_key, persona) elif method == "agent": if not api_key or not tavily_api_key: return "API key and Tavily API key are required for agent-based optimization." return optimize_with_agent(prompt, api_key, persona, tavily_api_key) elif method == "spacy": optimizer = AdvancedPromptOptimizer() optimized, _, _ = optimizer.optimize(prompt, aggressiveness) return optimized else: return "Invalid method. Use 'simple', 'agent', or 'spacy'." @mcp.tool( name="get_available_personas", description="Get list of available optimization personas and their descriptions.", ) def get_available_personas() -> str: return "\n".join([f"- {persona}: {desc.split('.')[0]}..." for persona, desc in PERSONAS.items()]) @mcp.tool( name="count_tokens", description="Count tokens in text using specified model tokenizer.", ) def count_tokens( text: str = Field(description="The text to count tokens for"), model: str = Field(default="gpt-4", description="The model to use for tokenization"), ) -> str: count = get_accurate_token_count(text, model) return f"Token count: {count}" @mcp.resource("prompts://optimization", mime_type="application/json") def list_optimization_methods() -> list[str]: return ["simple", "agent", "spacy"] @mcp.resource("prompts://personas", mime_type="application/json") def list_personas() -> list[str]: return list(PERSONAS.keys()) @mcp.resource("prompts://personas/{persona_id}", mime_type="text/plain") def fetch_persona_details(persona_id: str) -> str: if persona_id not in PERSONAS: raise ValueError(f"Persona with id {persona_id} not found") return PERSONAS[persona_id] @mcp.prompt( name="optimize_for_persona", description="Creates an optimization prompt tailored to a specific persona.", ) def optimize_for_persona( text: str = Field(description="Text to optimize"), persona: str = Field(description="Persona to optimize for"), ) -> list[base.Message]: if persona not in PERSONAS: persona = "Default" prompt = f""" Your goal is to optimize the following text for the {persona} persona. The text to optimize is: {text} Persona guidelines: {PERSONAS[persona]} Use the 'optimize_prompt' tool with method='simple' to optimize the text. After optimization, respond with the optimized version and explain what changes were made. """ return [base.UserMessage(prompt)] if __name__ == "__main__": #mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp") mcp.run(transport="streamable-http")