poetica / main.py
abhisheksan's picture
Update main.py
36d6b8e verified
import os
import logging
import sys
from datetime import datetime
from typing import Optional, Dict, Any, List
from functools import lru_cache
import torch
import asyncio
import numpy as np
import re
from fastapi import FastAPI, HTTPException, status, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
from transformers import GPT2Tokenizer, GPT2LMHeadModel, GPT2Config
from contextlib import asynccontextmanager
# Configuration
class Config:
BASE_MODEL_DIR = "./models/"
MODEL_PATH = os.path.join(BASE_MODEL_DIR, "poeticagpt.pth")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 8 # Increased batch size for better throughput
CACHE_SIZE = 2048 # Increased cache size
MAX_QUEUE_SIZE = 16 # Maximum number of requests to queue
QUANTIZE_MODEL = True # Enable quantization for improved performance
WARMUP_INPUTS = True # Pre-warm the model with sample inputs
# Use environment-specific log directory or default to a temp directory
LOG_DIR = os.environ.get('LOG_DIR', '/tmp/poetry_logs')
ENABLE_PROFILING = False # Set to True to enable performance profiling
REQUEST_TIMEOUT = 30.0 # Timeout for request processing in seconds
MODEL_CONFIG = GPT2Config(
n_positions=400,
n_ctx=400,
n_embd=384,
n_layer=6,
n_head=6,
vocab_size=50257,
bos_token_id=50256,
eos_token_id=50256,
use_cache=True,
)
config = Config()
# Configure logging with proper error handling
def setup_logging():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Always add stdout handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Try to set up file handler, but handle permission issues gracefully
try:
# Attempt to create directory if it doesn't exist
os.makedirs(config.LOG_DIR, exist_ok=True)
log_file = os.path.join(
config.LOG_DIR,
f'poetry_generation_{datetime.now().strftime("%Y%m%d")}.log'
)
# Test if we can write to the file
with open(log_file, 'a') as f:
pass
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
print(f"Log file created at: {log_file}")
except (PermissionError, OSError) as e:
print(f"Warning: Could not create log file: {e}")
print(f"Continuing with console logging only.")
return logger
# Initialize logger
logger = setup_logging()
# Request models
class GenerateRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=500)
max_length: Optional[int] = Field(default=100, ge=10, le=500)
temperature: float = Field(default=0.9, ge=0.1, le=2.0)
top_k: int = Field(default=50, ge=1, le=100)
top_p: float = Field(default=0.95, ge=0.1, le=1.0)
repetition_penalty: float = Field(default=1.2, ge=1.0, le=2.0)
style: Optional[str] = Field(default="free_verse",
description="Poetry style: free_verse, haiku, sonnet")
@validator('prompt')
def validate_prompt(cls, v):
# Normalize whitespace
v = ' '.join(v.split())
return v
# Poem formatting module
class PoemFormatter:
"""Efficient poem formatter with optimized text processing"""
@staticmethod
def format_free_verse(text: str) -> List[str]:
# More efficient regex splitting
lines = re.split(r'[.!?]+|\n+', text)
lines = [line.strip() for line in lines if line.strip()]
formatted_lines = []
for line in lines:
if len(line) > 40:
parts = line.split(',')
formatted_lines.extend(part.strip() for part in parts if part.strip())
else:
formatted_lines.append(line)
return formatted_lines
@staticmethod
def format_haiku(text: str) -> List[str]:
# Precompile regex for performance
vowel_pattern = re.compile(r'[aeiou]+')
words = text.split()
lines = []
current_line = []
syllable_count = 0
syllable_targets = [5, 7, 5] # Traditional haiku structure
current_target_idx = 0
for word in words:
syllables = len(vowel_pattern.findall(word.lower())) or 1 # Ensure at least 1 syllable
if current_target_idx >= len(syllable_targets):
break
current_target = syllable_targets[current_target_idx]
if syllable_count + syllables <= current_target:
current_line.append(word)
syllable_count += syllables
else:
if current_line:
lines.append(' '.join(current_line))
current_line = [word]
syllable_count = syllables
current_target_idx += 1
if current_line and len(lines) < len(syllable_targets):
lines.append(' '.join(current_line))
# Ensure we have exactly 3 lines for a haiku
while len(lines) < 3:
lines.append("...")
return lines[:3]
@staticmethod
def format_sonnet(text: str) -> List[str]:
words = text.split()
lines = []
current_line = []
target_line_length = 10 # Approximate iambic pentameter
for word in words:
current_line.append(word)
if len(current_line) >= target_line_length:
lines.append(' '.join(current_line))
current_line = []
if len(lines) >= 14: # Traditional sonnet has 14 lines
break
if current_line and len(lines) < 14:
lines.append(' '.join(current_line))
# Ensure we have 14 lines for a complete sonnet
while len(lines) < 14:
lines.append("...")
return lines
@staticmethod
def generate_title(poem_text: str) -> str:
words = poem_text.split()[:10] # Use more words to find better title candidates
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'by'}
key_words = [word for word in words if word.lower() not in stop_words and len(word) > 2]
if key_words:
title = ' '.join(key_words[:3]).strip().capitalize()
return title if title else "Untitled"
return "Untitled"
# Request queue for efficient processing
class RequestQueue:
def __init__(self, max_size=config.MAX_QUEUE_SIZE):
self.queue = asyncio.Queue(maxsize=max_size)
self.semaphore = asyncio.Semaphore(max_size)
async def add_request(self, request_data):
async with self.semaphore:
return await asyncio.wait_for(
self._process_request(request_data),
timeout=config.REQUEST_TIMEOUT
)
async def _process_request(self, request_data):
future = asyncio.Future()
await self.queue.put((request_data, future))
return await future
# Optimized Tokenization Service
class TokenizationService:
def __init__(self):
self.tokenizer = None
self._lock = asyncio.Lock()
@lru_cache(maxsize=config.CACHE_SIZE)
def cached_tokenize(self, text):
return self.tokenizer.encode(text, return_tensors='pt')
async def initialize(self):
async with self._lock:
if self.tokenizer is None:
logger.info("Initializing tokenizer")
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
self.tokenizer.pad_token = self.tokenizer.eos_token
return self.tokenizer
async def encode(self, text):
if not self.tokenizer:
await self.initialize()
# Use multithreading for tokenization if the text is large
if len(text) > 100:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.cached_tokenize(text)
)
else:
return self.cached_tokenize(text)
def decode(self, tokens, skip_special_tokens=True):
return self.tokenizer.decode(tokens, skip_special_tokens=skip_special_tokens)
# Model Manager with optimization techniques
class ModelManager:
def __init__(self):
self.model = None
self._lock = asyncio.Lock()
self.request_count = 0
self.last_cleanup = datetime.now()
self.model_ready = asyncio.Event()
self.tokenization_service = TokenizationService()
self.request_queue = RequestQueue()
self.poem_formatter = PoemFormatter()
self.batch_processor_task = None
async def initialize(self) -> bool:
try:
logger.info(f"Initializing model on device: {config.DEVICE}")
# Check if model file exists
if not os.path.exists(config.MODEL_PATH):
logger.error(f"Model file not found at {config.MODEL_PATH}")
# Try to create directory in case it doesn't exist
os.makedirs(os.path.dirname(config.MODEL_PATH), exist_ok=True)
return False
await self.tokenization_service.initialize()
await self._load_and_optimize_model()
# Start batch processing worker
self.batch_processor_task = asyncio.create_task(self._batch_processor_worker())
logger.info(f"Model and tokenizer loaded successfully on {config.DEVICE}")
self.model_ready.set()
# Warmup the model with dummy inputs
if config.WARMUP_INPUTS:
await self._warmup_model()
return True
except Exception as e:
logger.error(f"Error initializing model: {str(e)}")
logger.exception("Detailed traceback:")
return False
async def _batch_processor_worker(self):
"""Worker that processes queued requests in batches"""
logger.info("Starting batch processor worker")
try:
while True:
# Process requests in batches when possible
if not self.request_queue.queue.empty():
batch = []
batch_futures = []
# Get up to BATCH_SIZE requests from the queue
batch_size = min(config.BATCH_SIZE, self.request_queue.queue.qsize())
for _ in range(batch_size):
if self.request_queue.queue.empty():
break
request_data, future = await self.request_queue.queue.get()
batch.append(request_data)
batch_futures.append(future)
if batch:
try:
# Process the batch
results = await self._process_batch(batch)
# Set results to futures
for i, future in enumerate(batch_futures):
if not future.done():
future.set_result(results[i])
except Exception as e:
# Set exception to all futures in the batch
for future in batch_futures:
if not future.done():
future.set_exception(e)
finally:
# Mark tasks as done
for _ in range(len(batch)):
self.request_queue.queue.task_done()
else:
# If queue is empty, sleep briefly before checking again
await asyncio.sleep(0.01)
except asyncio.CancelledError:
logger.info("Batch processor worker cancelled")
except Exception as e:
logger.error(f"Error in batch processor worker: {str(e)}")
logger.exception("Detailed traceback")
async def _process_batch(self, batch_requests):
"""Process a batch of requests efficiently"""
results = []
# Use with torch.no_grad() for all requests in the batch
with torch.no_grad():
for request in batch_requests:
try:
# Prepare inputs
inputs = await self._prepare_inputs(request.prompt)
# Generate text
outputs = await self._generate_optimized(inputs, request)
# Process outputs
result = await self._process_outputs(outputs, request)
results.append(result)
except Exception as e:
logger.error(f"Error processing request in batch: {str(e)}")
results.append({"error": str(e)})
return results
async def _load_and_optimize_model(self):
"""Load and optimize the model with advanced techniques"""
async with self._lock:
if not os.path.exists(config.MODEL_PATH):
raise FileNotFoundError(f"Model file not found at {config.MODEL_PATH}")
# Create model with configuration
self.model = GPT2LMHeadModel(config.MODEL_CONFIG)
# Load state dict
state_dict = torch.load(config.MODEL_PATH, map_location=config.DEVICE)
self.model.load_state_dict(state_dict, strict=False)
# Move model to device
self.model.to(config.DEVICE)
self.model.eval() # Set to evaluation mode
# Apply quantization if enabled and supported
if config.QUANTIZE_MODEL and config.DEVICE.type == 'cuda':
try:
# Use dynamic quantization for better inference performance
torch.quantization.quantize_dynamic(
self.model, {torch.nn.Linear}, dtype=torch.qint8
)
logger.info("Model quantized successfully")
except Exception as e:
logger.warning(f"Quantization failed, using full precision: {str(e)}")
# Apply other optimizations for CUDA devices
if config.DEVICE.type == 'cuda':
# Set optimization flags
torch.backends.cudnn.benchmark = True
# Enable TF32 precision if available (on A100 GPUs)
if hasattr(torch.backends.cuda, 'matmul') and hasattr(torch.backends.cuda.matmul, 'allow_tf32'):
torch.backends.cuda.matmul.allow_tf32 = True
# Convert model to TorchScript for faster inference
try:
# Use a safer approach to TorchScript optimization
self.model = torch.jit.script(self.model)
logger.info("Model optimized with TorchScript")
except Exception as e:
logger.warning(f"TorchScript optimization failed: {str(e)}")
async def _warmup_model(self):
"""Pre-warm the model with sample inputs to eliminate cold start issues"""
logger.info("Warming up model...")
# Create dummy inputs of different lengths
dummy_texts = [
"Write a poem about nature",
"Write a poem about love and loss in the modern world"
]
# Process dummy requests
dummy_requests = [
GenerateRequest(prompt=text, max_length=50, temperature=0.9)
for text in dummy_texts
]
for req in dummy_requests:
try:
with torch.no_grad():
# Prepare inputs
inputs = await self._prepare_inputs(req.prompt)
# Run model inference
_ = await self._generate_optimized(inputs, req)
except Exception as e:
logger.warning(f"Model warmup error: {str(e)}")
logger.info("Model warmup completed")
async def _prepare_inputs(self, prompt: str):
"""Prepare model inputs with optimized tokenization"""
poetry_prompt = f"Write a poem about: {prompt}\n\nPoem:"
tokens = await self.tokenization_service.encode(poetry_prompt)
return tokens.to(config.DEVICE)
async def _generate_optimized(self, inputs, request: GenerateRequest):
"""Optimized text generation with style-specific parameters"""
attention_mask = torch.ones(inputs.shape, dtype=torch.long, device=config.DEVICE)
# Style-specific parameters
style_params = {
"haiku": {"max_length": 50, "repetition_penalty": 1.4, "no_repeat_ngram_size": 2},
"sonnet": {"max_length": 200, "repetition_penalty": 1.2, "no_repeat_ngram_size": 3},
"free_verse": {
"max_length": request.max_length,
"repetition_penalty": request.repetition_penalty,
"no_repeat_ngram_size": 3
}
}
params = style_params.get(request.style, style_params["free_verse"])
# Get bad word IDs for filtering
tokenizer = await self.tokenization_service.initialize()
bad_words = ['http', 'www', 'com', ':', '/', '#', '[', ']', '{', '}']
bad_words_ids = [[tokenizer.encode(word)[0]] for word in bad_words if len(tokenizer.encode(word)) > 0]
return self.model.generate(
inputs,
attention_mask=attention_mask,
max_length=params["max_length"],
num_return_sequences=1,
temperature=request.temperature,
top_k=request.top_k,
top_p=request.top_p,
repetition_penalty=params["repetition_penalty"],
do_sample=True,
pad_token_id=tokenizer.eos_token_id,
use_cache=True,
no_repeat_ngram_size=params["no_repeat_ngram_size"],
early_stopping=True,
bad_words_ids=bad_words_ids,
min_length=20 if request.style != "haiku" else 10,
)
async def _process_outputs(self, outputs, request: GenerateRequest):
"""Process and format the generated text into a poem"""
# Decode generated text
raw_text = self.tokenization_service.decode(outputs[0], skip_special_tokens=True)
# Extract poem from generated text
prompt_pattern = f"Write a poem about: {request.prompt}\n\nPoem:"
poem_text = raw_text.replace(prompt_pattern, '').strip()
# Format based on style
if request.style == "haiku":
formatted_lines = self.poem_formatter.format_haiku(poem_text)
elif request.style == "sonnet":
formatted_lines = self.poem_formatter.format_sonnet(poem_text)
else:
formatted_lines = self.poem_formatter.format_free_verse(poem_text)
# Generate response
return {
"poem": {
"title": self.poem_formatter.generate_title(poem_text),
"lines": formatted_lines,
"style": request.style
},
"original_prompt": request.prompt,
"parameters": {
"max_length": request.max_length,
"temperature": request.temperature,
"top_k": request.top_k,
"top_p": request.top_p,
"repetition_penalty": request.repetition_penalty
},
"metadata": {
"device": config.DEVICE.type,
"model_type": "GPT2-Optimized",
"timestamp": datetime.now().isoformat()
}
}
async def generate(self, request: GenerateRequest) -> Dict[str, Any]:
"""Queue a request for generation and await result"""
try:
# Wait for model to be ready
await asyncio.wait_for(self.model_ready.wait(), timeout=60.0)
self.request_count += 1
# Add request to queue and get result
result = await self.request_queue.add_request(request)
return result
except asyncio.TimeoutError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Model is still initializing or overloaded"
)
except Exception as e:
logger.error(f"Error generating text: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
async def cleanup(self):
"""Perform memory cleanup operations"""
if config.DEVICE.type == 'cuda':
torch.cuda.empty_cache()
self.last_cleanup = datetime.now()
logger.info("Memory cleanup performed")
async def shutdown(self):
"""Clean shutdown of the model manager"""
# Cancel batch processor worker
if self.batch_processor_task:
self.batch_processor_task.cancel()
try:
await self.batch_processor_task
except asyncio.CancelledError:
pass
# Clear model from memory
if self.model is not None:
self.model = None
# Clear tokenizer from memory
if self.tokenization_service.tokenizer is not None:
self.tokenization_service.tokenizer = None
# Final memory cleanup
if config.DEVICE.type == 'cuda':
torch.cuda.empty_cache()
# Create model manager instance
model_manager = ModelManager()
# FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize on startup
initialized = await model_manager.initialize()
if not initialized:
logger.error("Failed to initialize model manager")
yield
# Clean up on shutdown
logger.info("Shutting down Poetry Generation API")
await model_manager.shutdown()
# Create FastAPI app
app = FastAPI(
title="Poetry Generation API",
description="High-Performance API for generating poetry using GPT-2",
version="3.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check endpoint
@app.api_route("/health", methods=["GET", "HEAD"])
async def health_check():
return {
"status": "healthy",
"model_loaded": model_manager.model is not None,
"model_ready": model_manager.model_ready.is_set(),
"tokenizer_loaded": model_manager.tokenization_service.tokenizer is not None,
"device": config.DEVICE.type,
"request_count": model_manager.request_count,
"queue_size": model_manager.request_queue.queue.qsize(),
"last_cleanup": model_manager.last_cleanup.isoformat(),
"system_info": {
"cuda_available": torch.cuda.is_available(),
"cuda_device_count": torch.cuda.device_count() if torch.cuda.is_available() else 0,
"cuda_memory": {
"allocated": f"{torch.cuda.memory_allocated() / (1024**2):.2f} MB",
"reserved": f"{torch.cuda.memory_reserved() / (1024**2):.2f} MB",
"max_allocated": f"{torch.cuda.max_memory_allocated() / (1024**2):.2f} MB"
} if torch.cuda.is_available() else {},
}
}
# Poetry generation endpoint
@app.post("/generate")
async def generate_text(
request: GenerateRequest,
background_tasks: BackgroundTasks
):
try:
result = await model_manager.generate(request)
# Schedule cleanup every 50 requests
if model_manager.request_count % 50 == 0:
background_tasks.add_task(model_manager.cleanup)
return JSONResponse(
content=result,
status_code=status.HTTP_200_OK
)
except HTTPException as e:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Error in generate_text: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
# Add profiling endpoint if profiling is enabled
if config.ENABLE_PROFILING:
@app.get("/profiling")
async def get_profiling():
if config.DEVICE.type == 'cuda':
return {
"memory": {
"allocated": f"{torch.cuda.memory_allocated() / (1024**2):.2f} MB",
"reserved": f"{torch.cuda.memory_reserved() / (1024**2):.2f} MB",
"max_allocated": f"{torch.cuda.max_memory_allocated() / (1024**2):.2f} MB"
},
"request_count": model_manager.request_count,
"queue_size": model_manager.request_queue.queue.qsize(),
}
else:
return {"device": "cpu", "profiling": "not available"}