|
import json |
|
import os |
|
import gradio as gr |
|
from typing import Optional, Dict, Any, Union |
|
from PIL import Image |
|
from pydantic import BaseModel |
|
import logging |
|
from config import Config |
|
|
|
|
|
try: |
|
from llama_cpp import Llama |
|
LLAMA_CPP_AVAILABLE = True |
|
except ImportError as e: |
|
print(f"Warning: llama-cpp-python not available: {e}") |
|
LLAMA_CPP_AVAILABLE = False |
|
Llama = None |
|
|
|
|
|
try: |
|
from huggingface_hub import hf_hub_download |
|
HUGGINGFACE_HUB_AVAILABLE = True |
|
except ImportError as e: |
|
print(f"Warning: huggingface_hub not available: {e}") |
|
HUGGINGFACE_HUB_AVAILABLE = False |
|
hf_hub_download = None |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class StructuredOutputRequest(BaseModel): |
|
prompt: str |
|
image: Optional[str] = None |
|
json_schema: Dict[str, Any] |
|
|
|
class LLMClient: |
|
def __init__(self): |
|
""" |
|
Initialize client for working with local GGUF model via llama-cpp-python |
|
""" |
|
self.model_path = Config.get_model_path() |
|
logger.info(f"Using model: {self.model_path}") |
|
|
|
self.llm = None |
|
|
|
self._initialize_model() |
|
|
|
def _download_model_if_needed(self) -> str: |
|
"""Download model from Hugging Face if it doesn't exist locally""" |
|
if os.path.exists(self.model_path): |
|
logger.info(f"Model already exists at: {self.model_path}") |
|
return self.model_path |
|
|
|
|
|
|
|
if os.getenv('DOCKER_CONTAINER', 'false').lower() == 'true': |
|
|
|
alternative_paths = [ |
|
f"/app/models/{Config.MODEL_FILENAME}", |
|
f"./models/{Config.MODEL_FILENAME}", |
|
f"/models/{Config.MODEL_FILENAME}", |
|
f"/app/{Config.MODEL_FILENAME}" |
|
] |
|
|
|
for alt_path in alternative_paths: |
|
if os.path.exists(alt_path): |
|
logger.info(f"Found model at alternative location: {alt_path}") |
|
return alt_path |
|
|
|
|
|
models_dir = "/app/models" |
|
if os.path.exists(models_dir): |
|
files = os.listdir(models_dir) |
|
logger.error(f"Contents of {models_dir}: {files}") |
|
else: |
|
logger.error(f"Directory {models_dir} does not exist") |
|
|
|
|
|
logger.warning("Model not found in expected locations, attempting download...") |
|
|
|
if not HUGGINGFACE_HUB_AVAILABLE: |
|
raise ImportError("huggingface_hub is not available. Please install it to download models.") |
|
|
|
logger.info(f"Downloading model {Config.MODEL_REPO}/{Config.MODEL_FILENAME}...") |
|
|
|
|
|
models_dir = Config.get_models_dir() |
|
os.makedirs(models_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
model_path = hf_hub_download( |
|
repo_id=Config.MODEL_REPO, |
|
filename=Config.MODEL_FILENAME, |
|
local_dir=models_dir, |
|
token=Config.HUGGINGFACE_TOKEN if Config.HUGGINGFACE_TOKEN else None |
|
) |
|
|
|
logger.info(f"Model downloaded to: {model_path}") |
|
return model_path |
|
except Exception as e: |
|
logger.error(f"Failed to download model: {e}") |
|
raise |
|
|
|
def _initialize_model(self): |
|
"""Initialize local GGUF model""" |
|
try: |
|
if not LLAMA_CPP_AVAILABLE: |
|
raise ImportError("llama-cpp-python is not available. Please check installation.") |
|
|
|
logger.info("Loading local model...") |
|
|
|
|
|
model_path = self._download_model_if_needed() |
|
|
|
|
|
if not os.path.exists(model_path): |
|
raise FileNotFoundError(f"Model file not found: {model_path}") |
|
|
|
|
|
file_size = os.path.getsize(model_path) |
|
if file_size < 1024 * 1024: |
|
raise ValueError(f"Model file seems corrupted or incomplete. Size: {file_size} bytes") |
|
|
|
logger.info(f"Model file verified. Size: {file_size / (1024**3):.2f} GB") |
|
|
|
|
|
logger.info("Initializing Llama model...") |
|
self.llm = Llama( |
|
model_path=model_path, |
|
n_ctx=Config.N_CTX, |
|
n_batch=Config.N_BATCH, |
|
n_gpu_layers=Config.N_GPU_LAYERS, |
|
use_mlock=Config.USE_MLOCK, |
|
use_mmap=Config.USE_MMAP, |
|
vocab_only=False, |
|
f16_kv=Config.F16_KV, |
|
logits_all=False, |
|
embedding=False, |
|
n_threads=Config.N_THREADS, |
|
last_n_tokens_size=64, |
|
lora_base=None, |
|
lora_path=None, |
|
seed=Config.SEED, |
|
verbose=True |
|
) |
|
|
|
logger.info("Model successfully loaded and initialized") |
|
|
|
|
|
logger.info("Testing model with simple prompt...") |
|
test_response = self.llm("Hello", max_tokens=1, temperature=0.1) |
|
logger.info("Model test successful") |
|
|
|
except Exception as e: |
|
logger.error(f"Error initializing model: {e}") |
|
|
|
if "Failed to load model from file" in str(e): |
|
logger.error("This error usually indicates:") |
|
logger.error("1. Model file is corrupted or incomplete") |
|
logger.error("2. llama-cpp-python version is incompatible with the model") |
|
logger.error("3. Insufficient memory to load the model") |
|
logger.error(f"4. Model path: {self.model_path}") |
|
raise |
|
|
|
def _validate_json_schema(self, schema: str) -> Dict[str, Any]: |
|
"""Validate and parse JSON schema""" |
|
try: |
|
parsed_schema = json.loads(schema) |
|
return parsed_schema |
|
except json.JSONDecodeError as e: |
|
raise ValueError(f"Invalid JSON schema: {e}") |
|
|
|
def _format_prompt_with_schema(self, prompt: str, json_schema: Dict[str, Any]) -> str: |
|
""" |
|
Format prompt for structured output generation |
|
""" |
|
schema_str = json.dumps(json_schema, ensure_ascii=False, indent=2) |
|
|
|
formatted_prompt = f"""User: {prompt} |
|
|
|
Please respond in strict accordance with the following JSON schema: |
|
|
|
```json |
|
{schema_str} |
|
``` |
|
|
|
Return ONLY valid JSON without additional comments or explanations.""" |
|
|
|
return formatted_prompt |
|
|
|
def generate_structured_response(self, |
|
prompt: str, |
|
json_schema: Union[str, Dict[str, Any]], |
|
image: Optional[Image.Image] = None) -> Dict[str, Any]: |
|
""" |
|
Generate structured response from local GGUF model |
|
""" |
|
try: |
|
|
|
if isinstance(json_schema, str): |
|
parsed_schema = self._validate_json_schema(json_schema) |
|
else: |
|
parsed_schema = json_schema |
|
|
|
|
|
formatted_prompt = self._format_prompt_with_schema(prompt, parsed_schema) |
|
|
|
|
|
if image is not None: |
|
logger.warning("Image processing is not supported with this local model") |
|
|
|
|
|
logger.info("Generating response...") |
|
|
|
response = self.llm( |
|
formatted_prompt, |
|
max_tokens=Config.MAX_NEW_TOKENS, |
|
temperature=Config.TEMPERATURE, |
|
stop=["User:", "\n\n"], |
|
echo=False |
|
) |
|
|
|
|
|
generated_text = response['choices'][0]['text'] |
|
|
|
|
|
try: |
|
|
|
json_start = generated_text.find('{') |
|
json_end = generated_text.rfind('}') + 1 |
|
|
|
if json_start != -1 and json_end > json_start: |
|
json_str = generated_text[json_start:json_end] |
|
parsed_response = json.loads(json_str) |
|
return { |
|
"success": True, |
|
"data": parsed_response, |
|
"raw_response": generated_text |
|
} |
|
else: |
|
return { |
|
"error": "Could not find JSON in model response", |
|
"raw_response": generated_text |
|
} |
|
|
|
except json.JSONDecodeError as e: |
|
return { |
|
"error": f"JSON parsing error: {e}", |
|
"raw_response": generated_text |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error: {e}") |
|
return { |
|
"error": f"Generation error: {str(e)}" |
|
} |
|
|
|
|
|
logger.info("Initializing LLM client...") |
|
try: |
|
llm_client = LLMClient() |
|
logger.info("LLM client successfully initialized") |
|
except Exception as e: |
|
logger.error(f"Error initializing LLM client: {e}") |
|
llm_client = None |
|
|
|
def process_request(prompt: str, |
|
json_schema: str, |
|
image: Optional[Image.Image] = None) -> str: |
|
""" |
|
Process request through Gradio interface |
|
""" |
|
if llm_client is None: |
|
return json.dumps({ |
|
"error": "LLM client not initialized", |
|
"details": "Check logs for detailed error information" |
|
}, ensure_ascii=False, indent=2) |
|
|
|
if not prompt.strip(): |
|
return json.dumps({"error": "Prompt cannot be empty"}, ensure_ascii=False, indent=2) |
|
|
|
if not json_schema.strip(): |
|
return json.dumps({"error": "JSON schema cannot be empty"}, ensure_ascii=False, indent=2) |
|
|
|
result = llm_client.generate_structured_response(prompt, json_schema, image) |
|
return json.dumps(result, ensure_ascii=False, indent=2) |
|
|
|
|
|
example_schema = """{ |
|
"type": "object", |
|
"properties": { |
|
"summary": { |
|
"type": "string", |
|
"description": "Brief summary of the response" |
|
}, |
|
"sentiment": { |
|
"type": "string", |
|
"enum": ["positive", "negative", "neutral"], |
|
"description": "Emotional tone" |
|
}, |
|
"confidence": { |
|
"type": "number", |
|
"minimum": 0, |
|
"maximum": 1, |
|
"description": "Confidence level in the response" |
|
}, |
|
"keywords": { |
|
"type": "array", |
|
"items": { |
|
"type": "string" |
|
}, |
|
"description": "Key words" |
|
} |
|
}, |
|
"required": ["summary", "sentiment", "confidence"] |
|
}""" |
|
|
|
example_prompt = "Analyze the following text and provide a structured assessment: 'The company's new product received enthusiastic user reviews. Sales exceeded all expectations by 150%.'" |
|
|
|
def create_gradio_interface(): |
|
"""Create Gradio interface""" |
|
|
|
with gr.Blocks(title="LLM Structured Output", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# 🤖 LLM with Structured Output") |
|
gr.Markdown(f"Application for generating structured responses using model **{Config.MODEL_REPO}/{Config.MODEL_FILENAME}**") |
|
|
|
|
|
if llm_client is None: |
|
gr.Markdown("⚠️ **Warning**: Model not loaded. Check configuration and restart the application.") |
|
else: |
|
gr.Markdown("✅ **Status**: Model successfully loaded and ready to work") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
prompt_input = gr.Textbox( |
|
label="Prompt for model", |
|
placeholder="Enter your request...", |
|
lines=5, |
|
value=example_prompt |
|
) |
|
|
|
image_input = gr.Image( |
|
label="Image (optional, for multimodal models)", |
|
type="pil" |
|
) |
|
|
|
schema_input = gr.Textbox( |
|
label="JSON schema for response structure", |
|
placeholder="Enter JSON schema...", |
|
lines=15, |
|
value=example_schema |
|
) |
|
|
|
submit_btn = gr.Button("Generate Response", variant="primary") |
|
|
|
with gr.Column(): |
|
output = gr.Textbox( |
|
label="Structured Response", |
|
lines=20, |
|
interactive=False |
|
) |
|
|
|
submit_btn.click( |
|
fn=process_request, |
|
inputs=[prompt_input, schema_input, image_input], |
|
outputs=output |
|
) |
|
|
|
|
|
gr.Markdown("## 📋 Usage Examples") |
|
|
|
examples = gr.Examples( |
|
examples=[ |
|
[ |
|
"Describe today's weather in New York", |
|
"""{ |
|
"type": "object", |
|
"properties": { |
|
"temperature": {"type": "number"}, |
|
"description": {"type": "string"}, |
|
"humidity": {"type": "number"} |
|
} |
|
}""", |
|
None |
|
], |
|
[ |
|
"Create a Python learning plan for one month", |
|
"""{ |
|
"type": "object", |
|
"properties": { |
|
"weeks": { |
|
"type": "array", |
|
"items": { |
|
"type": "object", |
|
"properties": { |
|
"week_number": {"type": "integer"}, |
|
"topics": {"type": "array", "items": {"type": "string"}}, |
|
"practice_hours": {"type": "number"} |
|
} |
|
} |
|
}, |
|
"total_hours": {"type": "number"} |
|
} |
|
}""", |
|
None |
|
] |
|
], |
|
inputs=[prompt_input, schema_input, image_input] |
|
) |
|
|
|
|
|
gr.Markdown(f""" |
|
## ℹ️ Model Information |
|
|
|
- **Model**: {Config.MODEL_REPO}/{Config.MODEL_FILENAME} |
|
- **Local path**: {Config.MODEL_PATH} |
|
- **Context window**: {Config.N_CTX} tokens |
|
- **Batch size**: {Config.N_BATCH} |
|
- **GPU layers**: {Config.N_GPU_LAYERS if Config.N_GPU_LAYERS >= 0 else "All"} |
|
- **CPU threads**: {Config.N_THREADS} |
|
- **Maximum response length**: {Config.MAX_NEW_TOKENS} tokens |
|
- **Temperature**: {Config.TEMPERATURE} |
|
- **Memory lock**: {"Enabled" if Config.USE_MLOCK else "Disabled"} |
|
- **Memory mapping**: {"Enabled" if Config.USE_MMAP else "Disabled"} |
|
|
|
💡 **Tip**: Use clear and specific JSON schemas for better results. |
|
""") |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
|
|
demo = create_gradio_interface() |
|
demo.launch( |
|
server_name=Config.HOST, |
|
server_port=Config.GRADIO_PORT, |
|
share=False, |
|
debug=True |
|
) |
|
|