Spaces:
Sleeping
Sleeping
| from functools import wraps | |
| from enum import Enum | |
| from typing import Any, Callable, Dict, Optional, Sequence, Union | |
| from opentelemetry import trace | |
| from opentelemetry.sdk.resources import SERVICE_NAME, Resource | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import ( | |
| BatchSpanProcessor, | |
| ) | |
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
| from chromadb.config import Component | |
| from chromadb.config import System | |
| class OpenTelemetryGranularity(Enum): | |
| """The granularity of the OpenTelemetry spans.""" | |
| NONE = "none" | |
| """No spans are emitted.""" | |
| OPERATION = "operation" | |
| """Spans are emitted for each operation.""" | |
| OPERATION_AND_SEGMENT = "operation_and_segment" | |
| """Spans are emitted for each operation and segment.""" | |
| ALL = "all" | |
| """Spans are emitted for almost every method call.""" | |
| # Greater is more restrictive. So "all" < "operation" (and everything else), | |
| # "none" > everything. | |
| def __lt__(self, other: Any) -> bool: | |
| """Compare two granularities.""" | |
| order = [ | |
| OpenTelemetryGranularity.ALL, | |
| OpenTelemetryGranularity.OPERATION_AND_SEGMENT, | |
| OpenTelemetryGranularity.OPERATION, | |
| OpenTelemetryGranularity.NONE, | |
| ] | |
| return order.index(self) < order.index(other) | |
| class OpenTelemetryClient(Component): | |
| def __init__(self, system: System): | |
| super().__init__(system) | |
| otel_init( | |
| system.settings.chroma_otel_service_name, | |
| system.settings.chroma_otel_collection_endpoint, | |
| system.settings.chroma_otel_collection_headers, | |
| OpenTelemetryGranularity( | |
| system.settings.chroma_otel_granularity | |
| if system.settings.chroma_otel_granularity | |
| else "none" | |
| ), | |
| ) | |
| tracer: Optional[trace.Tracer] = None | |
| granularity: OpenTelemetryGranularity = OpenTelemetryGranularity("none") | |
| def otel_init( | |
| otel_service_name: Optional[str], | |
| otel_collection_endpoint: Optional[str], | |
| otel_collection_headers: Optional[Dict[str, str]], | |
| otel_granularity: OpenTelemetryGranularity, | |
| ) -> None: | |
| """Initializes module-level state for OpenTelemetry. | |
| Parameters match the environment variables which configure OTel as documented | |
| at https://docs.trychroma.com/observability. | |
| - otel_service_name: The name of the service for OTel tagging and aggregation. | |
| - otel_collection_endpoint: The endpoint to which OTel spans are sent | |
| (e.g. api.honeycomb.com). | |
| - otel_collection_headers: The headers to send with OTel spans | |
| (e.g. {"x-honeycomb-team": "abc123"}). | |
| - otel_granularity: The granularity of the spans to emit. | |
| """ | |
| if otel_granularity == OpenTelemetryGranularity.NONE: | |
| return | |
| resource = Resource(attributes={SERVICE_NAME: str(otel_service_name)}) | |
| provider = TracerProvider(resource=resource) | |
| provider.add_span_processor( | |
| BatchSpanProcessor( | |
| # TODO: we may eventually want to make this configurable. | |
| OTLPSpanExporter( | |
| endpoint=str(otel_collection_endpoint), | |
| headers=otel_collection_headers, | |
| ) | |
| ) | |
| ) | |
| trace.set_tracer_provider(provider) | |
| global tracer, granularity | |
| tracer = trace.get_tracer(__name__) | |
| granularity = otel_granularity | |
| def trace_method( | |
| trace_name: str, | |
| trace_granularity: OpenTelemetryGranularity, | |
| attributes: Optional[ | |
| Dict[ | |
| str, | |
| Union[ | |
| str, | |
| bool, | |
| float, | |
| int, | |
| Sequence[str], | |
| Sequence[bool], | |
| Sequence[float], | |
| Sequence[int], | |
| ], | |
| ] | |
| ] = None, | |
| ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | |
| """A decorator that traces a method.""" | |
| def decorator(f: Callable[..., Any]) -> Callable[..., Any]: | |
| def wrapper(*args: Any, **kwargs: Dict[Any, Any]) -> Any: | |
| global tracer, granularity | |
| if trace_granularity < granularity: | |
| return f(*args, **kwargs) | |
| if not tracer: | |
| return f(*args, **kwargs) | |
| with tracer.start_as_current_span(trace_name, attributes=attributes): | |
| return f(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def add_attributes_to_current_span( | |
| attributes: Dict[ | |
| str, | |
| Union[ | |
| str, | |
| bool, | |
| float, | |
| int, | |
| Sequence[str], | |
| Sequence[bool], | |
| Sequence[float], | |
| Sequence[int], | |
| ], | |
| ] | |
| ) -> None: | |
| """Add attributes to the current span.""" | |
| global tracer, granularity | |
| if granularity == OpenTelemetryGranularity.NONE: | |
| return | |
| if not tracer: | |
| return | |
| span = trace.get_current_span() | |
| span.set_attributes(attributes) | |