badalsahani's picture
feat: chroma initial deploy
287a0bc
mod assignment;
mod config;
mod errors;
mod index;
mod ingest;
mod memberlist;
mod segment;
mod server;
mod storage;
mod sysdb;
mod system;
mod types;
use config::Configurable;
use memberlist::MemberlistProvider;
use crate::sysdb::sysdb::SysDb;
mod chroma_proto {
tonic::include_proto!("chroma");
}
pub async fn worker_entrypoint() {
let config = config::RootConfig::load();
// Create all the core components and start them
// TODO: This should be handled by an Application struct and we can push the config into it
// for now we expose the config to pub and inject it into the components
// The two root components are ingest, and the gRPC server
let mut system: system::System = system::System::new();
let mut ingest = match ingest::Ingest::try_from_config(&config.worker).await {
Ok(ingest) => ingest,
Err(err) => {
println!("Failed to create ingest component: {:?}", err);
return;
}
};
let mut memberlist =
match memberlist::CustomResourceMemberlistProvider::try_from_config(&config.worker).await {
Ok(memberlist) => memberlist,
Err(err) => {
println!("Failed to create memberlist component: {:?}", err);
return;
}
};
let mut scheduler = ingest::RoundRobinScheduler::new();
let segment_manager = match segment::SegmentManager::try_from_config(&config.worker).await {
Ok(segment_manager) => segment_manager,
Err(err) => {
println!("Failed to create segment manager component: {:?}", err);
return;
}
};
let mut segment_ingestor_receivers =
Vec::with_capacity(config.worker.num_indexing_threads as usize);
for _ in 0..config.worker.num_indexing_threads {
let segment_ingestor = segment::SegmentIngestor::new(segment_manager.clone());
let segment_ingestor_handle = system.start_component(segment_ingestor);
let recv = segment_ingestor_handle.receiver();
segment_ingestor_receivers.push(recv);
}
let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await {
Ok(worker_server) => worker_server,
Err(err) => {
println!("Failed to create worker server component: {:?}", err);
return;
}
};
worker_server.set_segment_manager(segment_manager.clone());
// Boot the system
// memberlist -> ingest -> scheduler -> NUM_THREADS x segment_ingestor -> segment_manager
// server <- segment_manager
for recv in segment_ingestor_receivers {
scheduler.subscribe(recv);
}
let mut scheduler_handler = system.start_component(scheduler);
ingest.subscribe(scheduler_handler.receiver());
let mut ingest_handle = system.start_component(ingest);
let recv = ingest_handle.receiver();
memberlist.subscribe(recv);
let mut memberlist_handle = system.start_component(memberlist);
let server_join_handle = tokio::spawn(async move {
crate::server::WorkerServer::run(worker_server).await;
});
// Join on all handles
let _ = tokio::join!(
ingest_handle.join(),
memberlist_handle.join(),
scheduler_handler.join(),
);
}