Spaces:
Building
Building
"""python3.9 -m mia.fpv.get_fpv --cfg mia/conf/example.yaml""" | |
import argparse | |
import itertools | |
import traceback | |
from functools import partial | |
from typing import Dict | |
from pathlib import Path | |
import tracemalloc | |
import copy | |
import json | |
import numpy as np | |
import asyncio | |
from tqdm import tqdm | |
from omegaconf import OmegaConf | |
import pandas as pd | |
from .. import logger | |
from .geo import Projection | |
from .download import ( | |
MapillaryDownloader, | |
fetch_image_infos, | |
fetch_images_pixels, | |
get_city_boundary, | |
get_tiles_from_boundary, | |
) | |
from .prepare import process_sequence, default_cfg | |
from .filters import in_shape_filter, FilterPipeline | |
class JSONEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.ndarray): | |
return obj.tolist() | |
elif isinstance(obj, np.generic): | |
return obj.item() | |
return json.JSONEncoder.default(self, obj) | |
def write_json(path, data): | |
with open(path, "w") as f: | |
json.dump(data, f, cls=JSONEncoder) | |
def get_token(token: str) -> str: | |
if Path(token).is_file(): | |
logger.info(f"Reading token from file {token}") | |
with open(token, 'r') as file: | |
token = file.read().strip() | |
if not token.startswith("MLY"): | |
logger.fatal(f"The token '{token}' is invalid") | |
exit(1) | |
else: | |
logger.info(f"Using token {token}") | |
return token | |
def fetch_city_boundaries(cities: list): | |
""" | |
Args: | |
cities: List of dictionaries describing the city/region to fetch in the fpv.yaml format. | |
""" | |
data = [] | |
pbar = tqdm(cities) | |
for loc_info in pbar: | |
loc_fmt = loc_info["name"] | |
if "state" in loc_info: | |
loc_fmt = f"{loc_fmt}, {loc_info['state']}" | |
else: | |
loc_info["state"] = "" | |
if "country" in loc_info: | |
loc_fmt = f"{loc_fmt}, {loc_info['country']}" | |
else: | |
loc_info["country"] = "" | |
pbar.set_description(f"Getting boundary for {loc_fmt}") | |
entry = copy.copy(dict(loc_info)) | |
get_city_boundary_ = partial(get_city_boundary, loc_info["name"], loc_info["state"], loc_info["country"]) | |
if "bound_type" not in loc_info: | |
assert "sequence_ids" in loc_info | |
raise NotImplementedError() | |
elif loc_info["bound_type"] == "custom_bbox": | |
assert "custom_bbox" in loc_info | |
entry["bbox"] = dict(zip(["west", "south", "east", "north"], | |
[float(x) for x in loc_info["custom_bbox"].split(",")])) | |
elif loc_info["bound_type"] == "auto_shape": | |
entry["bbox"], entry["shape"] = get_city_boundary_(fetch_shape=True) | |
elif loc_info["bound_type"] == "auto_bbox": | |
entry["bbox"] = get_city_boundary_(fetch_shape=False) | |
elif loc_info["bound_type"] == "custom_size": | |
assert "custom_size" in loc_info | |
custom_size = loc_info["custom_size"] | |
bbox = get_city_boundary_(fetch_shape=False) | |
# Calculation below is obviously not very accurate. | |
# Good enough for small bounding boxes | |
bbox_center = [(bbox['west'] + bbox['east'])/2, (bbox['south'] + bbox['north'])/2] | |
bbox['west'] = bbox_center[0] - custom_size / (111.32*np.cos(np.deg2rad(bbox_center[1]))) | |
bbox['east'] = bbox_center[0] + custom_size / (111.32*np.cos(np.deg2rad(bbox_center[1]))) | |
bbox['south'] = bbox_center[1] - custom_size / 111.32 | |
bbox['north'] = bbox_center[1] + custom_size / 111.32 | |
entry["bbox"] = bbox | |
entry["custom_size"] = custom_size | |
else: | |
raise Exception(f"Unsupported bound_type type '{loc_info['bound_type']}'") | |
data.append(entry) | |
return data | |
def geojson_feature_list_to_pandas(feature_list, split_coords=True): | |
t = pd.json_normalize(feature_list) | |
cols_to_drop = ["type", "geometry.type", "properties.organization_id", "computed_geometry.type"] | |
if split_coords: | |
t[['geometry.long','geometry.lat']] = pd.DataFrame(t["geometry.coordinates"].tolist(), index=t.index) | |
# Computed geometry maybe nan if its not available so we check if the value could be a nan (a float type) | |
if "computed_geometry.coordinates" in t.columns: | |
t["computed_geometry.long"] = t["computed_geometry.coordinates"].map(lambda x: (x if isinstance(x, float) else x[0]) ) | |
t["computed_geometry.lat"] = t["computed_geometry.coordinates"].map(lambda x: (x if isinstance(x, float) else x[1]) ) | |
t.drop(columns=cols_to_drop, inplace=True, errors="ignore") | |
t.columns = t.columns.str.removeprefix('properties.') | |
t["id"] = t["id"].astype(str) | |
return t | |
def parse_image_points_json_data(rd: dict, combine=True) -> pd.DataFrame: | |
""" | |
Parse the json in to a pandas dataframe | |
""" | |
df_dict = dict() | |
for tile, feature_list in tqdm(rd.items(), total=len(rd)): | |
if len(feature_list) == 0: | |
continue | |
df_dict[tile] = geojson_feature_list_to_pandas(feature_list) | |
if combine: | |
logger.info(f"Joining all dataframes into one.") | |
return pd.concat(df_dict.values()) | |
else: | |
return df_dict | |
def log_memory_usage(): | |
current, peak = tracemalloc.get_traced_memory() | |
current_gb = current / 10**9 | |
peak_gb = peak / 10**9 | |
logger.info(f"Current memory: {current_gb:.3f} GB; Peak was {peak_gb:.3f} GB") | |
def main(args, cfgs): | |
pipeline = FilterPipeline.load_from_yaml(cfgs.fpv_options.filter_pipeline_cfg) | |
# setup the mapillary downloader | |
tracemalloc.start() | |
token = get_token(args.token) | |
downloader = MapillaryDownloader(token) | |
loop = asyncio.get_event_loop() | |
# setup file structure | |
dataset_dir = Path(cfgs.dataset_dir) | |
dataset_dir.mkdir(exist_ok=True, parents=True) | |
# Fetch the bounds for the cities | |
logger.info(f"Auto fetching boundaries for cities if needed.") | |
cities_bounds_info = fetch_city_boundaries(cfgs.cities) | |
log_memory_usage() | |
# loop through the cities and collect the mapillary data (images, metadata, etc.) | |
for city_boundary_info in cities_bounds_info: | |
# Clear out dataframes since we may use None checks to see if we need | |
# to load the dataframe for a particular stage | |
df = None | |
df_meta = None | |
df_meta_filtered = None | |
df_meta_filtered_processed = None | |
logger.info(f"Processing {city_boundary_info['name']}") | |
# setup the directories | |
location_name = city_boundary_info['name'].lower().replace(" ", "_") | |
location_dir = dataset_dir / location_name | |
infos_dir = location_dir / "image_infos_chunked" | |
raw_image_dir = location_dir / "images_raw" | |
out_image_dir = location_dir / "images" | |
for d in (infos_dir, raw_image_dir, out_image_dir, location_dir): | |
if not d.exists(): | |
logger.info(f"{d} does not exist. Creating directory {d}") | |
d.mkdir(parents=True, exist_ok=True) | |
write_json(location_dir / "boundary_info.json", city_boundary_info) | |
# Stage 1: collect the id of the images in the specified bounding box | |
if cfgs.fpv_options.stages.get_image_points_from_tiles: | |
logger.info(f"[{location_name}] Stage 1 (Downloading image IDs) ------------------") | |
tiles = get_tiles_from_boundary(city_boundary_info) | |
logger.info(f"[{location_name}] Found {len(tiles)} zoom-14 tiles for this boundary. Starting image point download") | |
image_points_response = loop.run_until_complete( | |
downloader.get_tiles_image_points(tiles) | |
) | |
if image_points_response is None: | |
logger.warn(f"[{location_name}] No image points found in boundary. Skipping city") | |
continue | |
write_json(location_dir / 'images_points_dump.json', image_points_response) | |
# parse the data into a geopandas dataframe | |
logger.info(f"[{location_name}] Parsing image point json data into dataframe") | |
df = parse_image_points_json_data(image_points_response) | |
# Filter if needed | |
if city_boundary_info["bound_type"] == "auto_shape": | |
old_count = df.shape[0] | |
df = df[in_shape_filter(df, city_boundary_info["shape"])] | |
new_count = df.shape[0] | |
logger.info(f"[{location_name}] Keeping {new_count}/{old_count} ({new_count/old_count*100:.2f}%) " | |
"points that are within city boundaries") | |
df.to_parquet(location_dir / 'image_points.parquet') | |
# Stage 2: download the metadata | |
if cfgs.fpv_options.stages.get_metadata: | |
logger.info(f"[{location_name}] Stage 2 (Downloading Metadata) ------------------") | |
if df is None: | |
pq_name = 'image_points.parquet' | |
df = pd.read_parquet(location_dir / pq_name) | |
logger.info(f"[{location_name}] Loaded {df.shape[0]} image points from {pq_name}") | |
log_memory_usage() | |
# chunk settings | |
chunk_size = cfgs.fpv_options.metadata_download_chunk_size | |
num_split = int(np.ceil(df.shape[0] / chunk_size)) | |
logger.info(f"[{location_name}] Splitting the {df.shape[0]} image points into {num_split} chunks of {chunk_size} image points each.") | |
# check if the metadata chunk has already been downloaded | |
num_downloaded_chunks = 0 | |
num_of_chunks_in_dir = len(list(infos_dir.glob("image_metadata_chunk_*.parquet"))) | |
df_meta_chunks = list() | |
df_meta = pd.DataFrame() | |
if infos_dir.exists() and num_of_chunks_in_dir > 0: | |
logger.info(f"[{location_name}] Found {len(list(infos_dir.glob('image_metadata_chunk_*.parquet')))} existing metadata chunks.") | |
downloaded_ids = [] | |
num_downloaded_data_pts = 0 | |
pbar = tqdm(infos_dir.glob("image_metadata_chunk_*.parquet"), total=num_of_chunks_in_dir) | |
for chunk_fp in pbar: | |
pbar.set_description(f"Loading {chunk_fp}") | |
chunk_df = pd.read_parquet(chunk_fp) | |
df_meta_chunks.append(chunk_df) | |
num_downloaded_chunks += 1 | |
num_downloaded_data_pts += len(chunk_df) | |
log_memory_usage() | |
num_pts_left = df.shape[0] - num_downloaded_data_pts | |
df_meta = pd.concat(df_meta_chunks) | |
df_meta_chunks.clear() | |
df = df[~df["id"].isin(df_meta["id"])] | |
# some quick checks to make sure the data is consistent | |
left_num_split = int(np.ceil(df.shape[0] / chunk_size)) | |
# if num_downloaded_chunks != (num_split - left_num_split): | |
# raise ValueError(f"Number of downloaded chunks {num_downloaded_chunks} does not match the number of chunks {num_split - left_num_split}") | |
if num_pts_left != len(df): | |
raise ValueError(f"Number of points left {num_pts_left} does not match the number of points in the dataframe {len(df)}") | |
if num_pts_left > 0: | |
logger.info(f"Restarting metadata download with {num_pts_left} points, {left_num_split} chunks left to download.") | |
# download the metadata | |
num_split = int(np.ceil(df.shape[0] / chunk_size)) | |
groups = df.groupby(np.arange(len(df.index)) // chunk_size) | |
for (frame_num, frame) in groups: | |
frame_num = frame_num + num_downloaded_chunks | |
logger.info(f"[{location_name}] Fetching metadata for {frame_num+1}/{num_split} chunk of {frame.shape[0]} image points.") | |
image_ids = frame["id"] | |
image_infos, num_fail = loop.run_until_complete( | |
fetch_image_infos(image_ids, downloader, infos_dir) | |
) | |
logger.info("%d failures (%.1f%%).", num_fail, 100 * num_fail / len(image_ids)) | |
if num_fail == len(image_ids): | |
logger.warn(f"[{location_name}] All images failed to be fetched. Skipping next steps") | |
continue | |
new_df_meta = geojson_feature_list_to_pandas(image_infos.values()) | |
df_meta_chunks.append(new_df_meta) | |
new_df_meta.to_parquet(infos_dir / f'image_metadata_chunk_{frame_num}.parquet') | |
log_memory_usage() | |
# Combine all new chunks into one DF | |
df_meta = pd.concat([df_meta] + df_meta_chunks) | |
df_meta_chunks.clear() | |
# Some standardization of the data | |
df_meta["model"] = df_meta["model"].str.lower().str.replace(' ', '').str.replace('_', '') | |
df_meta["make"] = df_meta["make"].str.lower().str.replace(' ', '').str.replace('_', '') | |
df_meta.to_parquet(location_dir / 'image_metadata.parquet') | |
# Stage 3: run filter pipeline | |
if cfgs.fpv_options.stages.run_filter: | |
logger.info(f"[{location_name}] Stage 3 (Filtering) ------------------") | |
if df_meta is None: | |
pq_name = 'image_metadata.parquet' | |
df_meta = pd.read_parquet(location_dir / pq_name) | |
logger.info(f"[{location_name}] Loaded {df_meta.shape[0]} image metadata from {pq_name}") | |
df_meta_filtered = pipeline(df_meta) | |
df_meta_filtered.to_parquet(location_dir / f'image_metadata_filtered.parquet') | |
if df_meta_filtered.shape[0] == 0: | |
logger.warning(f"[{location_name}] No images to download. Moving on to next location.") | |
continue | |
else: | |
logger.info(f"[{location_name}] {df_meta_filtered.shape[0]} images to download.") | |
# Stage 4: Download filtered images | |
if cfgs.fpv_options.stages.download_images: | |
logger.info(f"[{location_name}] Stage 4 (Downloading Images) ------------------") | |
if df_meta_filtered is None: | |
pq_name = f'image_metadata_filtered.parquet' | |
df_meta_filtered = pd.read_parquet(location_dir / pq_name) | |
logger.info(f"[{location_name}] Loaded {df_meta_filtered.shape[0]} image metadata from {pq_name}") | |
log_memory_usage() | |
# filter out the images that have already been downloaded | |
downloaded_image_fps = list(raw_image_dir.glob("*.jpg")) | |
downloaded_image_ids = [fp.stem for fp in downloaded_image_fps] | |
df_to_download = df_meta_filtered[~df_meta_filtered["id"].isin(downloaded_image_ids)] | |
logger.info(f"[{location_name}] {len(downloaded_image_ids)} images already downloaded. {df_to_download.shape[0]} images left to download.") | |
# download the images | |
image_urls = list(df_to_download.set_index("id")["thumb_2048_url"].items()) | |
if len(image_urls) > 0: | |
num_fail = loop.run_until_complete( | |
fetch_images_pixels(image_urls, downloader, raw_image_dir) | |
) | |
logger.info("%d failures (%.1f%%).", num_fail, 100 * num_fail / len(image_urls)) | |
# Stage 5: process the sequences | |
if cfgs.fpv_options.stages.to_process_sequence: | |
logger.info(f"[{location_name}] Stage 5 (Sequence Processing) ------------------") | |
if df_meta_filtered is None: | |
pq_name = f'image_metadata_filtered.parquet' | |
df_meta_filtered = pd.read_parquet(location_dir / pq_name) | |
logger.info(f"[{location_name}] Loaded {df_meta_filtered.shape[0]} image metadata from {pq_name}") | |
log_memory_usage() | |
# prepare the data for processing | |
seq_to_image_ids = df_meta_filtered.groupby('sequence')['id'].agg(list).to_dict() | |
lon_center = (city_boundary_info['bbox']['east'] + city_boundary_info['bbox']['west']) / 2 | |
lat_center = (city_boundary_info['bbox']['north'] + city_boundary_info['bbox']['south']) / 2 | |
projection = Projection(lat_center, lon_center, max_extent=50e3) # increase to 50km max extent for the projection, otherwise it will throw an error | |
df_meta_filtered.index = df_meta_filtered["id"] | |
image_infos = df_meta_filtered.to_dict(orient="index") | |
process_sequence_args = default_cfg | |
log_memory_usage() | |
# process the sequences | |
dump = {} | |
logger.info(f"[{location_name}] Processing downloaded sequences..") | |
processed_ids = list() | |
for seq_id, seq_image_ids in tqdm(seq_to_image_ids.items()): | |
try: | |
d, pi = process_sequence( | |
seq_image_ids, | |
image_infos, | |
projection, | |
process_sequence_args, | |
raw_image_dir, | |
out_image_dir, | |
) | |
if d is None or pi is None: | |
raise Exception("process_sequence returned None") | |
processed_ids.append(pi) | |
# TODO We shouldn't need dumps | |
dump.update(d) | |
except Exception as e: | |
logger.error(f"[{location_name}] Failed to process sequence {seq_id} skipping it. Error: {repr(e)}.") | |
logger.error(traceback.format_exc()) | |
write_json(location_dir / "dump.json", dump) | |
# TODO: Ideally we want to move the keyframe selection filter to | |
# The filtering pipeline such that we do not download unnecessary | |
# Raw Images. But for now, we will filter the dataframe one more time after processing | |
processed_ids = list(itertools.chain.from_iterable(processed_ids)) | |
df_meta_filtered_processed = df_meta_filtered[ df_meta_filtered["id"].isin(processed_ids)] | |
logger.info(f"[{location_name}] Final yield after processing is {df_meta_filtered_processed.shape[0]} images.") | |
df_meta_filtered_processed.to_parquet(location_dir / f'image_metadata_filtered_processed.parquet') | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--cfg", type=str, default="mia/conf/example.yaml", help="Path to config yaml file.") | |
parser.add_argument("--token", type=str, default='mapillary_key', help="Either a token string or a path to a file containing the token.") | |
args = parser.parse_args() | |
cfgs = OmegaConf.load(args.cfg) | |
main(args, cfgs) | |