Spaces:
Sleeping
Sleeping
File size: 4,407 Bytes
7672fa1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
#!/usr/bin/env python3
"""
FastAPI app to handle data processing for text data.
Author: Shilpaj Bhalerao
Date: Oct 29, 2024
"""
# Standard imports
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from fastapi import Request
from pydantic import BaseModel
import os
from pathlib import Path
# Local imports
from byte_pair_encoding import BPETokenizer
# Initialize FastAPI app
app = FastAPI()
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Initialize templates
templates = Jinja2Templates(directory="templates")
# Add a request model for text processing
class TextRequest(BaseModel):
text: str
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Render the main page"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
"""Handle file upload"""
content_type = file.content_type
content = await file.read()
try:
print(f"Received file: {file.filename}")
if content_type.startswith('text'):
print("Text file detected")
# Convert bytes to string
text = content.decode()
return {"type": "text", "text": text}
else:
print("Unsupported file type")
raise HTTPException(status_code=400, detail="Unsupported file type. Please upload a text file.")
except Exception as e:
print(f"Error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process")
async def process_data(file: UploadFile = File(...)):
"""Process the uploaded text file by tokenizing it using BPE"""
content_type = file.content_type
content = await file.read()
if content_type.startswith('text'):
# Load tokenizer and process text
tokenizer = BPETokenizer.load("tokenizer.json")
text = content.decode()
tokens = tokenizer.encode(text)
return {"type": "text", "processed_data": tokens}
else:
raise HTTPException(status_code=400, detail="Unsupported file type. Please upload a text file.")
@app.get("/sample/{sample_number}")
async def get_sample(sample_number: int):
"""Get sample text file content"""
try:
sample_path = Path(f"samples/sample{sample_number}.txt")
if not sample_path.exists():
raise HTTPException(status_code=404, detail="Sample file not found")
with open(sample_path, 'r', encoding='utf-8') as f:
text = f.read()
return {"type": "text", "text": text}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Add this new route to handle direct text processing
@app.post("/process_text")
async def process_text(text_request: TextRequest):
"""Process text directly without file upload"""
try:
# Load tokenizer and process text
tokenizer = BPETokenizer.load("tokenizer.json")
tokens = tokenizer.encode(text_request.text)
return {"type": "text", "processed_data": tokens}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Add this new route to handle token decoding
@app.post("/decode_text")
async def decode_text(text_request: TextRequest):
"""Decode the tokenized text back to original form"""
try:
# Load tokenizer and decode tokens
tokenizer = BPETokenizer.load("tokenizer.json")
# Clean and parse the token string
token_str = text_request.text.strip('[]').replace(' ', '') # Remove brackets and spaces
if not token_str:
raise ValueError("Empty token string")
# Split by comma and convert to integers
tokens = [int(t) for t in token_str.split(',') if t]
decoded_text = tokenizer.decode(tokens)
return {"type": "text", "decoded_text": decoded_text}
except ValueError as ve:
raise HTTPException(status_code=400, detail=f"Invalid token format: {str(ve)}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|