Spaces:
Building
Building
""" | |
Example usage: | |
python3.9 -m mapper.data.debug.calc_stats -d /ocean/projects/cis220039p/shared/map_perception/dataset_v0 | |
""" | |
import datetime | |
from datetime import datetime, timezone, timedelta | |
import time | |
import argparse | |
import os | |
from pathlib import Path | |
import json | |
from astral import LocationInfo | |
from astral.sun import sun | |
from timezonefinder import TimezoneFinder | |
import numpy as np | |
import pandas as pd | |
import geopandas as gpd | |
from pyproj.transformer import Transformer | |
from matplotlib import pyplot as plt | |
from matplotlib.backends.backend_pdf import PdfPages | |
import tqdm | |
from ..fpv import filters | |
from .. import logger | |
def is_daytime(timestamp, latitude, longitude): | |
# Create a LocationInfo object for the given latitude and longitude | |
tz_str = TimezoneFinder().timezone_at(lng=longitude, lat=latitude) | |
location = LocationInfo(name="", region="", timezone=tz_str, | |
latitude=latitude, longitude=longitude) | |
# Convert the timestamp to a datetime object | |
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) | |
# We query one day before and one day after to avoid timezone ambiguities | |
# Our query timestamp is guaranteed to fall into one of those 3 dates. | |
# Astral sometimes returns sunrise or sunsets that are not from the same query date | |
# Refer to this https://github.com/sffjunkie/astral/issues/83 | |
d0 = (dt - timedelta(days=1)).date() | |
d1 = dt.date() | |
d2 = (dt + timedelta(days=1)).date() | |
# Calculate sunrise and sunset times | |
times = list() | |
for d in [d0, d1, d2]: | |
s = sun(location.observer, date=d) | |
sunrise = s['sunrise'] | |
sunset = s['sunset'] | |
times.append((sunrise, "sunrise")) | |
times.append((sunset, 'sunset')) | |
# Need to sort because there is no particular order | |
# where sunrise is always before sunset or vice versa | |
times = sorted(times, key=lambda x: x[0]) | |
assert times[-1][0] > dt > times[0][0] | |
for i in range(1, len(times)): | |
if dt < times[i][0]: | |
prev_event = times[i-1][1] | |
break | |
return prev_event == "sunrise" | |
def calculate_occupancy_map(df: pd.DataFrame, bev_meter_coverage=112, meters_per_pixel=112): | |
""" | |
Args: | |
bev_meter_coverage: How much did the BEVs in the dataframe cover in meters | |
meters_per_pixel: At what resolution should we initialize the occupancy map. | |
This need not be the same resolution as the BEV. That would be unnecessarilly slow but most accurate. | |
""" | |
# convert pandas dataframe to geopandas dataframe | |
gdf = gpd.GeoDataFrame(df, | |
geometry=gpd.points_from_xy( | |
df['computed_geometry.long'], | |
df['computed_geometry.lat']), | |
crs=4326) | |
utm_crs = gdf.estimate_utm_crs() | |
gdf_utm = gdf.to_crs(utm_crs) | |
left = gdf_utm.geometry.x.min() - bev_meter_coverage | |
right = gdf_utm.geometry.x.max() + bev_meter_coverage | |
bottom = gdf_utm.geometry.y.min() - bev_meter_coverage | |
top = gdf_utm.geometry.y.max() + bev_meter_coverage | |
width = right - left | |
height = top - bottom | |
width_pixels = int(width // meters_per_pixel) | |
height_pixels = int(height // meters_per_pixel) | |
if bev_meter_coverage % meters_per_pixel != 0: | |
logger.warn(f"bev_meter_coverage {bev_meter_coverage} is not divisble by meters_per_pixel " | |
f"{meters_per_pixel}. Occupancy may be overestimated.") | |
bev_pixels = int(np.ceil(bev_meter_coverage / meters_per_pixel)) | |
logger.info(f"Initializing {height_pixels}x{width_pixels} occupancy map. Using {bev_pixels}x{bev_pixels} pixels for each BEV.") | |
map = np.zeros((height_pixels, width_pixels), dtype=bool) | |
for row in gdf_utm.itertuples(): | |
utm_x = row.geometry.x | |
utm_y = row.geometry.y | |
img_x = int((utm_x - left) // meters_per_pixel) | |
img_y = int((utm_y - bottom) // meters_per_pixel) | |
bev_pixels_left = bev_pixels // 2 | |
bev_pixels_right = bev_pixels - bev_pixels_left | |
map[img_y - bev_pixels_left: img_y + bev_pixels_right, | |
img_x - bev_pixels_left: img_x + bev_pixels_right] = True | |
return map | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--dataset_dir", '-d', type=str, required=True, help="Dataset directory") | |
parser.add_argument("--locations", '-l', type=str, default="all", | |
help="Location names in CSV format. Set to 'all' to traverse all locations.") | |
parser.add_argument("--plot", action="store_true", help="Store plots per location in PDFs") | |
parser.add_argument("--output", "-o", default=None, type=str, help="output json file to store statistics") | |
args = parser.parse_args() | |
locations = list() | |
if args.locations.lower() == "all": | |
locations = os.listdir(args.dataset_dir) | |
locations = [l for l in locations if os.path.isdir(os.path.join(args.dataset_dir, l))] | |
else: | |
locations = args.locations.split(",") | |
logger.info(f"Parsing {len(locations)} locations..") | |
all_locs_stats = dict() | |
for location in tqdm.tqdm(locations): | |
dataset_dir = Path(args.dataset_dir) | |
location_dir = dataset_dir / location | |
bev_dir = location_dir / "bev_raw" | |
semantic_mask_dir = location_dir / "semantic_masks" | |
osm_cache_dir = location_dir / "osm_cache" | |
pq_name = 'image_metadata_filtered_processed.parquet' | |
df = pd.read_parquet(location_dir / pq_name) | |
df = df[df["computed_geometry.lat"].notna()] | |
df = df[df["computed_geometry.long"].notna()] | |
logger.info(f"Loaded {df.shape[0]} image metadata from {location}") | |
# Calc derrivative attributes | |
tqdm.tqdm.pandas() | |
df["loc_descrip"] = filters.haversine_np( | |
lon1=df["geometry.long"], lat1=df["geometry.lat"], | |
lon2=df["computed_geometry.long"], lat2=df["computed_geometry.lat"] | |
) | |
df["angle_descrip"] = filters.angle_dist( | |
df["compass_angle"], | |
df["computed_compass_angle"] | |
) | |
# FIXME: Super slow | |
# df["is_daytime"] = df.progress_apply(lambda x: is_daytime(x["captured_at"]*1e-3, | |
# x["computed_geometry.lat"], | |
# x["computed_geometry.long"]), | |
# axis="columns", raw=False, engine="python") | |
meters_per_pixel = 7 | |
map = calculate_occupancy_map(df, bev_meter_coverage=112, | |
meters_per_pixel=meters_per_pixel) | |
# Calc aggregate stats | |
loc_stats = dict() | |
loc_stats["num_images"] = len(df) | |
loc_stats["area_covered_km2"] = np.sum(map) * meters_per_pixel ** 2 * 1e-6 | |
loc_stats["camera_types"] = set(df["camera_type"].unique()) | |
loc_stats["camera_makes"] = set(df["make"].unique()) | |
loc_stats["camera_model"] = set(df["model"].unique()) | |
all_locs_stats[location] = loc_stats | |
# Plot if requested | |
if args.plot: | |
with PdfPages(location_dir / "stats.pdf") as pdf: | |
plt.figure() | |
plt.imshow(map) | |
plt.title(f"{location} occupancy map") | |
pdf.savefig() | |
plt.close() | |
for k in ["make", "model", "camera_type", "loc_descrip", | |
"angle_descrip"]: | |
plt.figure() | |
df[k].hist() | |
plt.title(k) | |
plt.xlabel(k) | |
plt.xticks(rotation=90) | |
plt.ylabel("Count") | |
plt.tight_layout() | |
pdf.savefig() | |
plt.close() | |
# Aggregate all stats | |
aggregated_stats = dict() | |
for loc, loc_stats in all_locs_stats.items(): | |
for k,v in loc_stats.items(): | |
if isinstance(v, float) or isinstance(v, int): | |
if k not in aggregated_stats.keys(): | |
aggregated_stats[k] = v | |
else: | |
aggregated_stats[k] += v | |
elif isinstance(v, set): | |
if k not in aggregated_stats.keys(): | |
aggregated_stats[k] = v | |
else: | |
aggregated_stats[k] = aggregated_stats[k].union(v) | |
aggregated_stats[f"{k}_count"] = len(aggregated_stats[k]) | |
else: | |
raise Exception(f"{v} is not supported !") | |
all_locs_stats["aggregated"] = aggregated_stats | |
print(all_locs_stats) | |
# Store for json | |
for loc, loc_stats in all_locs_stats.items(): | |
for k,v in loc_stats.items(): | |
if isinstance(v, set): | |
loc_stats[k] = list(v) | |
if args.output: | |
with open(args.output, "w") as f: | |
json.dump(all_locs_stats, f, indent=2) |