Spaces:
Running
Running
mod assignment; | |
mod config; | |
mod errors; | |
mod index; | |
mod ingest; | |
mod memberlist; | |
mod segment; | |
mod server; | |
mod storage; | |
mod sysdb; | |
mod system; | |
mod types; | |
use config::Configurable; | |
use memberlist::MemberlistProvider; | |
use crate::sysdb::sysdb::SysDb; | |
mod chroma_proto { | |
tonic::include_proto!("chroma"); | |
} | |
pub async fn worker_entrypoint() { | |
let config = config::RootConfig::load(); | |
// Create all the core components and start them | |
// TODO: This should be handled by an Application struct and we can push the config into it | |
// for now we expose the config to pub and inject it into the components | |
// The two root components are ingest, and the gRPC server | |
let mut system: system::System = system::System::new(); | |
let mut ingest = match ingest::Ingest::try_from_config(&config.worker).await { | |
Ok(ingest) => ingest, | |
Err(err) => { | |
println!("Failed to create ingest component: {:?}", err); | |
return; | |
} | |
}; | |
let mut memberlist = | |
match memberlist::CustomResourceMemberlistProvider::try_from_config(&config.worker).await { | |
Ok(memberlist) => memberlist, | |
Err(err) => { | |
println!("Failed to create memberlist component: {:?}", err); | |
return; | |
} | |
}; | |
let mut scheduler = ingest::RoundRobinScheduler::new(); | |
let segment_manager = match segment::SegmentManager::try_from_config(&config.worker).await { | |
Ok(segment_manager) => segment_manager, | |
Err(err) => { | |
println!("Failed to create segment manager component: {:?}", err); | |
return; | |
} | |
}; | |
let mut segment_ingestor_receivers = | |
Vec::with_capacity(config.worker.num_indexing_threads as usize); | |
for _ in 0..config.worker.num_indexing_threads { | |
let segment_ingestor = segment::SegmentIngestor::new(segment_manager.clone()); | |
let segment_ingestor_handle = system.start_component(segment_ingestor); | |
let recv = segment_ingestor_handle.receiver(); | |
segment_ingestor_receivers.push(recv); | |
} | |
let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await { | |
Ok(worker_server) => worker_server, | |
Err(err) => { | |
println!("Failed to create worker server component: {:?}", err); | |
return; | |
} | |
}; | |
worker_server.set_segment_manager(segment_manager.clone()); | |
// Boot the system | |
// memberlist -> ingest -> scheduler -> NUM_THREADS x segment_ingestor -> segment_manager | |
// server <- segment_manager | |
for recv in segment_ingestor_receivers { | |
scheduler.subscribe(recv); | |
} | |
let mut scheduler_handler = system.start_component(scheduler); | |
ingest.subscribe(scheduler_handler.receiver()); | |
let mut ingest_handle = system.start_component(ingest); | |
let recv = ingest_handle.receiver(); | |
memberlist.subscribe(recv); | |
let mut memberlist_handle = system.start_component(memberlist); | |
let server_join_handle = tokio::spawn(async move { | |
crate::server::WorkerServer::run(worker_server).await; | |
}); | |
// Join on all handles | |
let _ = tokio::join!( | |
ingest_handle.join(), | |
memberlist_handle.join(), | |
scheduler_handler.join(), | |
); | |
} | |