Spaces:
Running
Running
| ## OBSERVABILITY | |
| from uuid import uuid4 | |
| import csv | |
| from io import StringIO | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Optional | |
| from observability import LLMObservabilityManager | |
| router = APIRouter( | |
| prefix="/observability", | |
| tags=["observability"] | |
| ) | |
| class ObservationResponse(BaseModel): | |
| observations: List[Dict] | |
| def create_csv_response(observations: List[Dict]) -> StreamingResponse: | |
| def iter_csv(data): | |
| output = StringIO() | |
| writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else []) | |
| writer.writeheader() | |
| for row in data: | |
| writer.writerow(row) | |
| output.seek(0) | |
| yield output.read() | |
| headers = { | |
| 'Content-Disposition': 'attachment; filename="observations.csv"' | |
| } | |
| return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers) | |
| async def get_last_observations(limit: int = 10, format: str = "json"): | |
| observability_manager = LLMObservabilityManager() | |
| try: | |
| # Get all observations, sorted by created_at in descending order | |
| all_observations = observability_manager.get_observations() | |
| all_observations.sort(key=lambda x: x['created_at'], reverse=True) | |
| # Get the last conversation_id | |
| if all_observations: | |
| last_conversation_id = all_observations[0]['conversation_id'] | |
| # Filter observations for the last conversation | |
| last_conversation_observations = [ | |
| obs for obs in all_observations | |
| if obs['conversation_id'] == last_conversation_id | |
| ][:limit] | |
| if format.lower() == "csv": | |
| return create_csv_response(last_conversation_observations) | |
| else: | |
| return ObservationResponse(observations=last_conversation_observations) | |
| else: | |
| if format.lower() == "csv": | |
| return create_csv_response([]) | |
| else: | |
| return ObservationResponse(observations=[]) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}") | |
| async def get_all_unique_observations(limit: Optional[int] = None): | |
| observability_manager = LLMObservabilityManager() | |
| return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit)) | |