Spaces:
Sleeping
Sleeping
| ## OBSERVABILITY | |
| from uuid import uuid4 | |
| import csv | |
| from io import StringIO | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Optional, Any | |
| from observability import LLMObservabilityManager | |
| router = APIRouter( | |
| prefix="/observability", | |
| tags=["observability"] | |
| ) | |
| class ObservationResponse(BaseModel): | |
| observations: List[Dict] | |
| def create_csv_response(observations: List[Dict]) -> StreamingResponse: | |
| def iter_csv(data): | |
| output = StringIO() | |
| writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else []) | |
| writer.writeheader() | |
| for row in data: | |
| writer.writerow(row) | |
| output.seek(0) | |
| yield output.read() | |
| headers = { | |
| 'Content-Disposition': 'attachment; filename="observations.csv"' | |
| } | |
| return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers) | |
| async def get_last_observations(limit: int = 10, format: str = "json"): | |
| observability_manager = LLMObservabilityManager() | |
| try: | |
| # Get all observations, sorted by created_at in descending order | |
| all_observations = observability_manager.get_observations() | |
| all_observations.sort(key=lambda x: x['created_at'], reverse=True) | |
| # Get the last conversation_id | |
| if all_observations: | |
| last_conversation_id = all_observations[0]['conversation_id'] | |
| # Filter observations for the last conversation | |
| last_conversation_observations = [ | |
| obs for obs in all_observations | |
| if obs['conversation_id'] == last_conversation_id | |
| ][:limit] | |
| if format.lower() == "csv": | |
| return create_csv_response(last_conversation_observations) | |
| else: | |
| return ObservationResponse(observations=last_conversation_observations) | |
| else: | |
| if format.lower() == "csv": | |
| return create_csv_response([]) | |
| else: | |
| return ObservationResponse(observations=[]) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}") | |
| async def get_all_unique_observations(limit: Optional[int] = None): | |
| observability_manager = LLMObservabilityManager() | |
| return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit)) | |
| class StatisticsResponse(BaseModel): | |
| statistics: Dict[str, Any] | |
| async def get_dashboard_data( | |
| days: Optional[int] = 30, | |
| time_series_interval: str = 'day', | |
| ): | |
| """ | |
| Get comprehensive Analytics data including statistics. | |
| Args: | |
| days: Number of days to look back for statistics | |
| time_series_interval: Interval for time series data ('hour', 'day', 'week', 'month') | |
| """ | |
| try: | |
| observability_manager = LLMObservabilityManager() | |
| # Get dashboard statistics | |
| statistics = observability_manager.get_dashboard_statistics( | |
| days=days, | |
| time_series_interval=time_series_interval | |
| ) | |
| return StatisticsResponse( | |
| statistics=statistics, | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to retrieve dashboard data: {str(e)}" | |
| ) | |