Example usage:
python3.9 -m mapper.data.debug.calc_stats -d /ocean/projects/cis220039p/shared/map_perception/dataset_v0
import datetime
from datetime import datetime, timezone, timedelta
import time
import argparse
import os
from pathlib import Path
import json
from astral import LocationInfo
from astral.sun import sun
from timezonefinder import TimezoneFinder
import numpy as np
import pandas as pd
import geopandas as gpd
from pyproj.transformer import Transformer
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import tqdm
from ..fpv import filters
from .. import logger
def is_daytime(timestamp, latitude, longitude):
# Create a LocationInfo object for the given latitude and longitude
tz_str = TimezoneFinder().timezone_at(lng=longitude, lat=latitude)
location = LocationInfo(name="", region="", timezone=tz_str,
latitude=latitude, longitude=longitude)
# Convert the timestamp to a datetime object
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
# We query one day before and one day after to avoid timezone ambiguities
# Our query timestamp is guaranteed to fall into one of those 3 dates.
# Astral sometimes returns sunrise or sunsets that are not from the same query date
# Refer to this https://github.com/sffjunkie/astral/issues/83
d0 = (dt - timedelta(days=1)).date()
d1 = dt.date()
d2 = (dt + timedelta(days=1)).date()
# Calculate sunrise and sunset times
times = list()
for d in [d0, d1, d2]:
s = sun(location.observer, date=d)
sunrise = s['sunrise']
sunset = s['sunset']
times.append((sunrise, "sunrise"))
times.append((sunset, 'sunset'))
# Need to sort because there is no particular order
# where sunrise is always before sunset or vice versa
times = sorted(times, key=lambda x: x[0])
assert times[-1][0] > dt > times[0][0]
for i in range(1, len(times)):
if dt < times[i][0]:
prev_event = times[i-1][1]
return prev_event == "sunrise"
def calculate_occupancy_map(df: pd.DataFrame, bev_meter_coverage=112, meters_per_pixel=112):
bev_meter_coverage: How much did the BEVs in the dataframe cover in meters
meters_per_pixel: At what resolution should we initialize the occupancy map.
This need not be the same resolution as the BEV. That would be unnecessarilly slow but most accurate.
# convert pandas dataframe to geopandas dataframe
gdf = gpd.GeoDataFrame(df,
utm_crs = gdf.estimate_utm_crs()
gdf_utm = gdf.to_crs(utm_crs)
left = gdf_utm.geometry.x.min() - bev_meter_coverage
right = gdf_utm.geometry.x.max() + bev_meter_coverage
bottom = gdf_utm.geometry.y.min() - bev_meter_coverage
top = gdf_utm.geometry.y.max() + bev_meter_coverage
width = right - left
height = top - bottom
width_pixels = int(width // meters_per_pixel)
height_pixels = int(height // meters_per_pixel)
if bev_meter_coverage % meters_per_pixel != 0:
logger.warn(f"bev_meter_coverage {bev_meter_coverage} is not divisble by meters_per_pixel "
f"{meters_per_pixel}. Occupancy may be overestimated.")
bev_pixels = int(np.ceil(bev_meter_coverage / meters_per_pixel))
logger.info(f"Initializing {height_pixels}x{width_pixels} occupancy map. Using {bev_pixels}x{bev_pixels} pixels for each BEV.")
map = np.zeros((height_pixels, width_pixels), dtype=bool)
for row in gdf_utm.itertuples():
utm_x = row.geometry.x
utm_y = row.geometry.y
img_x = int((utm_x - left) // meters_per_pixel)
img_y = int((utm_y - bottom) // meters_per_pixel)
bev_pixels_left = bev_pixels // 2
bev_pixels_right = bev_pixels - bev_pixels_left
map[img_y - bev_pixels_left: img_y + bev_pixels_right,
img_x - bev_pixels_left: img_x + bev_pixels_right] = True
return map
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_dir", '-d', type=str, required=True, help="Dataset directory")
parser.add_argument("--locations", '-l', type=str, default="all",
help="Location names in CSV format. Set to 'all' to traverse all locations.")
parser.add_argument("--plot", action="store_true", help="Store plots per location in PDFs")
parser.add_argument("--output", "-o", default=None, type=str, help="output json file to store statistics")
args = parser.parse_args()
locations = list()
if args.locations.lower() == "all":
locations = os.listdir(args.dataset_dir)
locations = [l for l in locations if os.path.isdir(os.path.join(args.dataset_dir, l))]
locations = args.locations.split(",")
logger.info(f"Parsing {len(locations)} locations..")
all_locs_stats = dict()
for location in tqdm.tqdm(locations):
dataset_dir = Path(args.dataset_dir)
location_dir = dataset_dir / location
bev_dir = location_dir / "bev_raw"
semantic_mask_dir = location_dir / "semantic_masks"
osm_cache_dir = location_dir / "osm_cache"
pq_name = 'image_metadata_filtered_processed.parquet'
df = pd.read_parquet(location_dir / pq_name)
df = df[df["computed_geometry.lat"].notna()]
df = df[df["computed_geometry.long"].notna()]
logger.info(f"Loaded {df.shape[0]} image metadata from {location}")
# Calc derrivative attributes
df["loc_descrip"] = filters.haversine_np(
lon1=df["geometry.long"], lat1=df["geometry.lat"],
lon2=df["computed_geometry.long"], lat2=df["computed_geometry.lat"]
df["angle_descrip"] = filters.angle_dist(
# FIXME: Super slow
# df["is_daytime"] = df.progress_apply(lambda x: is_daytime(x["captured_at"]*1e-3,
# x["computed_geometry.lat"],
# x["computed_geometry.long"]),
# axis="columns", raw=False, engine="python")
meters_per_pixel = 7
map = calculate_occupancy_map(df, bev_meter_coverage=112,
# Calc aggregate stats
loc_stats = dict()
loc_stats["num_images"] = len(df)
loc_stats["area_covered_km2"] = np.sum(map) * meters_per_pixel ** 2 * 1e-6
loc_stats["camera_types"] = set(df["camera_type"].unique())
loc_stats["camera_makes"] = set(df["make"].unique())
loc_stats["camera_model"] = set(df["model"].unique())
all_locs_stats[location] = loc_stats
# Plot if requested
if args.plot:
with PdfPages(location_dir / "stats.pdf") as pdf:
plt.title(f"{location} occupancy map")
for k in ["make", "model", "camera_type", "loc_descrip",
# Aggregate all stats
aggregated_stats = dict()
for loc, loc_stats in all_locs_stats.items():
for k,v in loc_stats.items():
if isinstance(v, float) or isinstance(v, int):
if k not in aggregated_stats.keys():
aggregated_stats[k] = v
aggregated_stats[k] += v
elif isinstance(v, set):
if k not in aggregated_stats.keys():
aggregated_stats[k] = v
aggregated_stats[k] = aggregated_stats[k].union(v)
aggregated_stats[f"{k}_count"] = len(aggregated_stats[k])
raise Exception(f"{v} is not supported !")
all_locs_stats["aggregated"] = aggregated_stats
# Store for json
for loc, loc_stats in all_locs_stats.items():
for k,v in loc_stats.items():
if isinstance(v, set):
loc_stats[k] = list(v)
if args.output:
with open(args.output, "w") as f:
json.dump(all_locs_stats, f, indent=2)