Spaces:
Sleeping
Sleeping
from abc import abstractmethod | |
from typing import Any, Callable, List | |
from overrides import EnforceOverrides, overrides | |
from chromadb.config import Component, System | |
from chromadb.types import Segment | |
class SegmentDirectory(Component): | |
"""A segment directory is a data interface that manages the location of segments. Concretely, this | |
means that for clustered chroma, it provides the grpc endpoint for a segment.""" | |
def get_segment_endpoint(self, segment: Segment) -> str: | |
"""Return the segment residence for a given segment ID""" | |
def register_updated_segment_callback( | |
self, callback: Callable[[Segment], None] | |
) -> None: | |
"""Register a callback that will be called when a segment is updated""" | |
pass | |
Memberlist = List[str] | |
class MemberlistProvider(Component, EnforceOverrides): | |
"""Returns the latest memberlist and provdes a callback for when it changes. This | |
callback may be called from a different thread than the one that called. Callers should ensure | |
that they are thread-safe.""" | |
callbacks: List[Callable[[Memberlist], Any]] | |
def __init__(self, system: System): | |
self.callbacks = [] | |
super().__init__(system) | |
def get_memberlist(self) -> Memberlist: | |
"""Returns the latest memberlist""" | |
pass | |
def set_memberlist_name(self, memberlist: str) -> None: | |
"""Sets the memberlist that this provider will watch""" | |
pass | |
def stop(self) -> None: | |
"""Stops watching the memberlist""" | |
self.callbacks = [] | |
def register_updated_memberlist_callback( | |
self, callback: Callable[[Memberlist], Any] | |
) -> None: | |
"""Registers a callback that will be called when the memberlist changes. May be called many times | |
with the same memberlist, so callers should be idempotent. May be called from a different thread. | |
""" | |
self.callbacks.append(callback) | |
def unregister_updated_memberlist_callback( | |
self, callback: Callable[[Memberlist], Any] | |
) -> bool: | |
"""Unregisters a callback that was previously registered. Returns True if the callback was | |
successfully unregistered, False if it was not ever registered.""" | |
if callback in self.callbacks: | |
self.callbacks.remove(callback) | |
return True | |
return False | |