Spaces:
Sleeping
Sleeping
import os | |
os.environ["HF_HOME"] = "/tmp/huggingface" | |
os.environ["MPLCONFIGDIR"] = "/tmp/mplconfig" | |
os.environ["HF_HOME"] = "/tmp" | |
os.makedirs("/tmp/huggingface", exist_ok=True) | |
os.makedirs("/tmp/mplconfig", exist_ok=True) | |
from transformers import AutoTokenizer, AutoModelForTokenClassification | |
os.environ["HF_HOME"] = "/tmp" | |
# Load the NER model | |
tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER") | |
model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER") | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from transformers import pipeline | |
from fastapi.responses import StreamingResponse | |
import matplotlib.pyplot as plt | |
import requests | |
import datetime | |
from io import BytesIO | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Create FastAPI app | |
app = FastAPI() | |
# Load transformers models | |
try: | |
ner_model = pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple") | |
sentiment_model = pipeline("sentiment-analysis", model="ProsusAI/finbert") | |
logger.info("Models loaded successfully.") | |
except Exception as e: | |
logger.error(f"Model loading failed: {e}") | |
ner_model = None | |
sentiment_model = None | |
# Request body schema for sentiment and NER | |
class TextRequest(BaseModel): | |
text: str | |
# Request body schema for chart | |
class CoinRequest(BaseModel): | |
coin_id: str | |
def home(): | |
return {"message": "Crypto News API is alive!"} | |
def analyze_sentiment(req: TextRequest): | |
if not sentiment_model: | |
raise HTTPException(status_code=503, detail="Sentiment model not available") | |
text = req.text.strip() | |
if not text: | |
raise HTTPException(status_code=400, detail="Text cannot be empty") | |
try: | |
result = sentiment_model(text[:512])[0] | |
return { | |
"label": result["label"], | |
"score": round(result["score"] * 100, 2) | |
} | |
except Exception as e: | |
logger.error(f"Sentiment analysis error: {e}") | |
raise HTTPException(status_code=500, detail="Sentiment analysis failed") | |
def analyze_ner(req: TextRequest): | |
if not ner_model: | |
raise HTTPException(status_code=503, detail="NER model not available") | |
text = req.text.strip() | |
if not text: | |
raise HTTPException(status_code=400, detail="Text cannot be empty") | |
try: | |
entities = ner_model(text[:512]) | |
relevant = [e['word'] for e in entities if e.get('entity_group') in ['ORG', 'PERSON', 'MISC', 'PRODUCT', 'GPE']] | |
unique_entities = list(dict.fromkeys(relevant))[:5] | |
return {"entities": unique_entities} | |
except Exception as e: | |
logger.error(f"NER analysis error: {e}") | |
raise HTTPException(status_code=500, detail="NER analysis failed") | |
def generate_chart(req: CoinRequest): | |
coin_id = req.coin_id.strip().lower() | |
if not coin_id: | |
raise HTTPException(status_code=400, detail="coin_id cannot be empty") | |
url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart" | |
params = {"vs_currency": "usd", "days": "1"} | |
try: | |
res = requests.get(url, params=params) | |
if res.status_code != 200: | |
logger.error(f"CoinGecko API error: {res.text}") | |
raise HTTPException(status_code=502, detail="Failed to fetch coin data from CoinGecko") | |
prices = res.json().get("prices", []) | |
if not prices: | |
raise HTTPException(status_code=404, detail="No price data available") | |
times = [datetime.datetime.fromtimestamp(p[0] / 1000) for p in prices] | |
values = [p[1] for p in prices] | |
plt.figure(figsize=(12, 6)) | |
plt.fill_between(times, values, color="#1f77b4", alpha=0.3) | |
plt.plot(times, values, color="#1f77b4", linewidth=3) | |
plt.title(f"{coin_id.capitalize()} Price (24h)", fontsize=20) | |
plt.xlabel("Time") | |
plt.ylabel("USD Price") | |
plt.grid(True, linestyle='--', alpha=0.5) | |
plt.tight_layout() | |
img_bytes = BytesIO() | |
plt.savefig(img_bytes, format="png") | |
plt.close() | |
img_bytes.seek(0) | |
return StreamingResponse(img_bytes, media_type="image/png") | |
except Exception as e: | |
logger.error(f"Chart generation error: {e}") | |
raise HTTPException(status_code=500, detail="Failed to generate chart") | |