File size: 2,442 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from abc import abstractmethod
from typing import Any, Callable, List

from overrides import EnforceOverrides, overrides
from chromadb.config import Component, System
from chromadb.types import Segment


class SegmentDirectory(Component):
    """A segment directory is a data interface that manages the location of segments. Concretely, this
    means that for clustered chroma, it provides the grpc endpoint for a segment."""

    @abstractmethod
    def get_segment_endpoint(self, segment: Segment) -> str:
        """Return the segment residence for a given segment ID"""

    @abstractmethod
    def register_updated_segment_callback(
        self, callback: Callable[[Segment], None]
    ) -> None:
        """Register a callback that will be called when a segment is updated"""
        pass


Memberlist = List[str]


class MemberlistProvider(Component, EnforceOverrides):
    """Returns the latest memberlist and provdes a callback for when it changes. This
    callback may be called from a different thread than the one that called. Callers should ensure
    that they are thread-safe."""

    callbacks: List[Callable[[Memberlist], Any]]

    def __init__(self, system: System):
        self.callbacks = []
        super().__init__(system)

    @abstractmethod
    def get_memberlist(self) -> Memberlist:
        """Returns the latest memberlist"""
        pass

    @abstractmethod
    def set_memberlist_name(self, memberlist: str) -> None:
        """Sets the memberlist that this provider will watch"""
        pass

    @overrides
    def stop(self) -> None:
        """Stops watching the memberlist"""
        self.callbacks = []

    def register_updated_memberlist_callback(
        self, callback: Callable[[Memberlist], Any]
    ) -> None:
        """Registers a callback that will be called when the memberlist changes. May be called many times
        with the same memberlist, so callers should be idempotent. May be called from a different thread.
        """
        self.callbacks.append(callback)

    def unregister_updated_memberlist_callback(
        self, callback: Callable[[Memberlist], Any]
    ) -> bool:
        """Unregisters a callback that was previously registered. Returns True if the callback was
        successfully unregistered, False if it was not ever registered."""
        if callback in self.callbacks:
            self.callbacks.remove(callback)
            return True
        return False