import contextlib import os import os.path as osp import sys from typing import cast import imageio.v3 as iio import numpy as np import torch class Dust3rPipeline(object): def __init__(self, device: str | torch.device = "cuda"): submodule_path = osp.realpath( osp.join(osp.dirname(__file__), "../../third_party/dust3r/") ) if submodule_path not in sys.path: sys.path.insert(0, submodule_path) try: with open(os.devnull, "w") as f, contextlib.redirect_stdout(f): from dust3r.cloud_opt import ( # type: ignore[import] GlobalAlignerMode, global_aligner, ) from dust3r.image_pairs import make_pairs # type: ignore[import] from dust3r.inference import inference # type: ignore[import] from dust3r.model import AsymmetricCroCo3DStereo # type: ignore[import] from dust3r.utils.image import load_images # type: ignore[import] except ImportError: raise ImportError( "Missing required submodule: 'dust3r'. Please ensure that all submodules are properly set up.\n\n" "To initialize them, run the following command in the project root:\n" " git submodule update --init --recursive" ) self.device = torch.device(device) self.model = AsymmetricCroCo3DStereo.from_pretrained( "naver/DUSt3R_ViTLarge_BaseDecoder_512_dpt" ).to(self.device) self._GlobalAlignerMode = GlobalAlignerMode self._global_aligner = global_aligner self._make_pairs = make_pairs self._inference = inference self._load_images = load_images def infer_cameras_and_points( self, img_paths: list[str], Ks: list[list] = None, c2ws: list[list] = None, batch_size: int = 16, schedule: str = "cosine", lr: float = 0.01, niter: int = 500, min_conf_thr: int = 3, ) -> tuple[ list[np.ndarray], np.ndarray, np.ndarray, list[np.ndarray], list[np.ndarray] ]: num_img = len(img_paths) if num_img == 1: print("Only one image found, duplicating it to create a stereo pair.") img_paths = img_paths * 2 images = self._load_images(img_paths, size=512) pairs = self._make_pairs( images, scene_graph="complete", prefilter=None, symmetrize=True, ) output = self._inference(pairs, self.model, self.device, batch_size=batch_size) ori_imgs = [iio.imread(p) for p in img_paths] ori_img_whs = np.array([img.shape[1::-1] for img in ori_imgs]) img_whs = np.concatenate([image["true_shape"][:, ::-1] for image in images], 0) scene = self._global_aligner( output, device=self.device, mode=self._GlobalAlignerMode.PointCloudOptimizer, same_focals=True, optimize_pp=False, # True, min_conf_thr=min_conf_thr, ) # if Ks is not None: # scene.preset_focal( # torch.tensor([[K[0, 0], K[1, 1]] for K in Ks]) # ) if c2ws is not None: scene.preset_pose(c2ws) _ = scene.compute_global_alignment( init="msp", niter=niter, schedule=schedule, lr=lr ) imgs = cast(list, scene.imgs) Ks = scene.get_intrinsics().detach().cpu().numpy().copy() c2ws = scene.get_im_poses().detach().cpu().numpy() # type: ignore pts3d = [x.detach().cpu().numpy() for x in scene.get_pts3d()] # type: ignore if num_img > 1: masks = [x.detach().cpu().numpy() for x in scene.get_masks()] points = [p[m] for p, m in zip(pts3d, masks)] point_colors = [img[m] for img, m in zip(imgs, masks)] else: points = [p.reshape(-1, 3) for p in pts3d] point_colors = [img.reshape(-1, 3) for img in imgs] # Convert back to the original image size. imgs = ori_imgs Ks[:, :2, -1] *= ori_img_whs / img_whs Ks[:, :2, :2] *= (ori_img_whs / img_whs).mean(axis=1, keepdims=True)[..., None] return imgs, Ks, c2ws, points, point_colors