File size: 3,278 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
mod assignment;
mod config;
mod errors;
mod index;
mod ingest;
mod memberlist;
mod segment;
mod server;
mod storage;
mod sysdb;
mod system;
mod types;

use config::Configurable;
use memberlist::MemberlistProvider;

use crate::sysdb::sysdb::SysDb;

mod chroma_proto {
    tonic::include_proto!("chroma");
}

pub async fn worker_entrypoint() {
    let config = config::RootConfig::load();
    // Create all the core components and start them
    // TODO: This should be handled by an Application struct and we can push the config into it
    // for now we expose the config to pub and inject it into the components

    // The two root components are ingest, and the gRPC server
    let mut system: system::System = system::System::new();

    let mut ingest = match ingest::Ingest::try_from_config(&config.worker).await {
        Ok(ingest) => ingest,
        Err(err) => {
            println!("Failed to create ingest component: {:?}", err);
            return;
        }
    };

    let mut memberlist =
        match memberlist::CustomResourceMemberlistProvider::try_from_config(&config.worker).await {
            Ok(memberlist) => memberlist,
            Err(err) => {
                println!("Failed to create memberlist component: {:?}", err);
                return;
            }
        };

    let mut scheduler = ingest::RoundRobinScheduler::new();

    let segment_manager = match segment::SegmentManager::try_from_config(&config.worker).await {
        Ok(segment_manager) => segment_manager,
        Err(err) => {
            println!("Failed to create segment manager component: {:?}", err);
            return;
        }
    };

    let mut segment_ingestor_receivers =
        Vec::with_capacity(config.worker.num_indexing_threads as usize);
    for _ in 0..config.worker.num_indexing_threads {
        let segment_ingestor = segment::SegmentIngestor::new(segment_manager.clone());
        let segment_ingestor_handle = system.start_component(segment_ingestor);
        let recv = segment_ingestor_handle.receiver();
        segment_ingestor_receivers.push(recv);
    }

    let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await {
        Ok(worker_server) => worker_server,
        Err(err) => {
            println!("Failed to create worker server component: {:?}", err);
            return;
        }
    };
    worker_server.set_segment_manager(segment_manager.clone());

    // Boot the system
    // memberlist -> ingest -> scheduler -> NUM_THREADS x segment_ingestor -> segment_manager
    // server <- segment_manager

    for recv in segment_ingestor_receivers {
        scheduler.subscribe(recv);
    }

    let mut scheduler_handler = system.start_component(scheduler);
    ingest.subscribe(scheduler_handler.receiver());

    let mut ingest_handle = system.start_component(ingest);
    let recv = ingest_handle.receiver();
    memberlist.subscribe(recv);
    let mut memberlist_handle = system.start_component(memberlist);

    let server_join_handle = tokio::spawn(async move {
        crate::server::WorkerServer::run(worker_server).await;
    });

    // Join on all handles
    let _ = tokio::join!(
        ingest_handle.join(),
        memberlist_handle.join(),
        scheduler_handler.join(),
    );
}