chroma / chromadb /segment /__init__.py
badalsahani's picture
feat: chroma initial deploy
287a0bc
from typing import Optional, Sequence, TypeVar, Type
from abc import abstractmethod
from chromadb.types import (
Collection,
MetadataEmbeddingRecord,
Operation,
VectorEmbeddingRecord,
Where,
WhereDocument,
VectorQuery,
VectorQueryResult,
Segment,
SeqId,
Metadata,
)
from chromadb.config import Component, System
from uuid import UUID
from enum import Enum
class SegmentType(Enum):
SQLITE = "urn:chroma:segment/metadata/sqlite"
HNSW_LOCAL_MEMORY = "urn:chroma:segment/vector/hnsw-local-memory"
HNSW_LOCAL_PERSISTED = "urn:chroma:segment/vector/hnsw-local-persisted"
HNSW_DISTRIBUTED = "urn:chroma:segment/vector/hnsw-distributed"
class SegmentImplementation(Component):
@abstractmethod
def __init__(self, sytstem: System, segment: Segment):
pass
@abstractmethod
def count(self) -> int:
"""Get the number of embeddings in this segment"""
pass
@abstractmethod
def max_seqid(self) -> SeqId:
"""Get the maximum SeqID currently indexed by this segment"""
pass
@staticmethod
def propagate_collection_metadata(metadata: Metadata) -> Optional[Metadata]:
"""Given an arbitrary metadata map (e.g, from a collection), validate it and
return metadata (if any) that is applicable and should be applied to the
segment. Validation errors will be reported to the user."""
return None
@abstractmethod
def delete(self) -> None:
"""Delete the segment and all its data"""
...
S = TypeVar("S", bound=SegmentImplementation)
class MetadataReader(SegmentImplementation):
"""Embedding Metadata segment interface"""
@abstractmethod
def get_metadata(
self,
where: Optional[Where] = None,
where_document: Optional[WhereDocument] = None,
ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> Sequence[MetadataEmbeddingRecord]:
"""Query for embedding metadata."""
pass
class VectorReader(SegmentImplementation):
"""Embedding Vector segment interface"""
@abstractmethod
def get_vectors(
self, ids: Optional[Sequence[str]] = None
) -> Sequence[VectorEmbeddingRecord]:
"""Get embeddings from the segment. If no IDs are provided, all embeddings are
returned."""
pass
@abstractmethod
def query_vectors(
self, query: VectorQuery
) -> Sequence[Sequence[VectorQueryResult]]:
"""Given a vector query, return the top-k nearest neighbors for vector in the
query."""
pass
class SegmentManager(Component):
"""Interface for a pluggable strategy for creating, retrieving and instantiating
segments as required"""
@abstractmethod
def create_segments(self, collection: Collection) -> Sequence[Segment]:
"""Return the segments required for a new collection. Returns only segment data,
does not persist to the SysDB"""
pass
@abstractmethod
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
"""Delete any local state for all the segments associated with a collection, and
returns a sequence of their IDs. Does not update the SysDB."""
pass
# Future Note: To support time travel, add optional parameters to this method to
# retrieve Segment instances that are bounded to events from a specific range of
# time
@abstractmethod
def get_segment(self, collection_id: UUID, type: Type[S]) -> S:
"""Return the segment that should be used for servicing queries to a collection.
Implementations should cache appropriately; clients are intended to call this
method repeatedly rather than storing the result (thereby giving this
implementation full control over which segment impls are in or out of memory at
a given time.)"""
pass
@abstractmethod
def hint_use_collection(self, collection_id: UUID, hint_type: Operation) -> None:
"""Signal to the segment manager that a collection is about to be used, so that
it can preload segments as needed. This is only a hint, and implementations are
free to ignore it."""
pass