|
|
|
import logging
|
|
import numpy as np
|
|
from itertools import count
|
|
from typing import List, Tuple
|
|
import torch
|
|
import tqdm
|
|
from fvcore.common.timer import Timer
|
|
|
|
from detectron2.utils import comm
|
|
|
|
from .build import build_batch_data_loader
|
|
from .common import DatasetFromList, MapDataset
|
|
from .samplers import TrainingSampler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _EmptyMapDataset(torch.utils.data.Dataset):
|
|
"""
|
|
Map anything to emptiness.
|
|
"""
|
|
|
|
def __init__(self, dataset):
|
|
self.ds = dataset
|
|
|
|
def __len__(self):
|
|
return len(self.ds)
|
|
|
|
def __getitem__(self, idx):
|
|
_ = self.ds[idx]
|
|
return [0]
|
|
|
|
|
|
def iter_benchmark(
|
|
iterator, num_iter: int, warmup: int = 5, max_time_seconds: float = 60
|
|
) -> Tuple[float, List[float]]:
|
|
"""
|
|
Benchmark an iterator/iterable for `num_iter` iterations with an extra
|
|
`warmup` iterations of warmup.
|
|
End early if `max_time_seconds` time is spent on iterations.
|
|
|
|
Returns:
|
|
float: average time (seconds) per iteration
|
|
list[float]: time spent on each iteration. Sometimes useful for further analysis.
|
|
"""
|
|
num_iter, warmup = int(num_iter), int(warmup)
|
|
|
|
iterator = iter(iterator)
|
|
for _ in range(warmup):
|
|
next(iterator)
|
|
timer = Timer()
|
|
all_times = []
|
|
for curr_iter in tqdm.trange(num_iter):
|
|
start = timer.seconds()
|
|
if start > max_time_seconds:
|
|
num_iter = curr_iter
|
|
break
|
|
next(iterator)
|
|
all_times.append(timer.seconds() - start)
|
|
avg = timer.seconds() / num_iter
|
|
return avg, all_times
|
|
|
|
|
|
class DataLoaderBenchmark:
|
|
"""
|
|
Some common benchmarks that help understand perf bottleneck of a standard dataloader
|
|
made of dataset, mapper and sampler.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
dataset,
|
|
*,
|
|
mapper,
|
|
sampler=None,
|
|
total_batch_size,
|
|
num_workers=0,
|
|
max_time_seconds: int = 90,
|
|
):
|
|
"""
|
|
Args:
|
|
max_time_seconds (int): maximum time to spent for each benchmark
|
|
other args: same as in `build.py:build_detection_train_loader`
|
|
"""
|
|
if isinstance(dataset, list):
|
|
dataset = DatasetFromList(dataset, copy=False, serialize=True)
|
|
if sampler is None:
|
|
sampler = TrainingSampler(len(dataset))
|
|
|
|
self.dataset = dataset
|
|
self.mapper = mapper
|
|
self.sampler = sampler
|
|
self.total_batch_size = total_batch_size
|
|
self.num_workers = num_workers
|
|
self.per_gpu_batch_size = self.total_batch_size // comm.get_world_size()
|
|
|
|
self.max_time_seconds = max_time_seconds
|
|
|
|
def _benchmark(self, iterator, num_iter, warmup, msg=None):
|
|
avg, all_times = iter_benchmark(iterator, num_iter, warmup, self.max_time_seconds)
|
|
if msg is not None:
|
|
self._log_time(msg, avg, all_times)
|
|
return avg, all_times
|
|
|
|
def _log_time(self, msg, avg, all_times, distributed=False):
|
|
percentiles = [np.percentile(all_times, k, interpolation="nearest") for k in [1, 5, 95, 99]]
|
|
if not distributed:
|
|
logger.info(
|
|
f"{msg}: avg={1.0/avg:.1f} it/s, "
|
|
f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, "
|
|
f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s."
|
|
)
|
|
return
|
|
avg_per_gpu = comm.all_gather(avg)
|
|
percentiles_per_gpu = comm.all_gather(percentiles)
|
|
if comm.get_rank() > 0:
|
|
return
|
|
for idx, avg, percentiles in zip(count(), avg_per_gpu, percentiles_per_gpu):
|
|
logger.info(
|
|
f"GPU{idx} {msg}: avg={1.0/avg:.1f} it/s, "
|
|
f"p1={percentiles[0]:.2g}s, p5={percentiles[1]:.2g}s, "
|
|
f"p95={percentiles[2]:.2g}s, p99={percentiles[3]:.2g}s."
|
|
)
|
|
|
|
def benchmark_dataset(self, num_iter, warmup=5):
|
|
"""
|
|
Benchmark the speed of taking raw samples from the dataset.
|
|
"""
|
|
|
|
def loader():
|
|
while True:
|
|
for k in self.sampler:
|
|
yield self.dataset[k]
|
|
|
|
self._benchmark(loader(), num_iter, warmup, "Dataset Alone")
|
|
|
|
def benchmark_mapper(self, num_iter, warmup=5):
|
|
"""
|
|
Benchmark the speed of taking raw samples from the dataset and map
|
|
them in a single process.
|
|
"""
|
|
|
|
def loader():
|
|
while True:
|
|
for k in self.sampler:
|
|
yield self.mapper(self.dataset[k])
|
|
|
|
self._benchmark(loader(), num_iter, warmup, "Single Process Mapper (sec/sample)")
|
|
|
|
def benchmark_workers(self, num_iter, warmup=10):
|
|
"""
|
|
Benchmark the dataloader by tuning num_workers to [0, 1, self.num_workers].
|
|
"""
|
|
candidates = [0, 1]
|
|
if self.num_workers not in candidates:
|
|
candidates.append(self.num_workers)
|
|
|
|
dataset = MapDataset(self.dataset, self.mapper)
|
|
for n in candidates:
|
|
loader = build_batch_data_loader(
|
|
dataset,
|
|
self.sampler,
|
|
self.total_batch_size,
|
|
num_workers=n,
|
|
)
|
|
self._benchmark(
|
|
iter(loader),
|
|
num_iter * max(n, 1),
|
|
warmup * max(n, 1),
|
|
f"DataLoader ({n} workers, bs={self.per_gpu_batch_size})",
|
|
)
|
|
del loader
|
|
|
|
def benchmark_IPC(self, num_iter, warmup=10):
|
|
"""
|
|
Benchmark the dataloader where each worker outputs nothing. This
|
|
eliminates the IPC overhead compared to the regular dataloader.
|
|
|
|
PyTorch multiprocessing's IPC only optimizes for torch tensors.
|
|
Large numpy arrays or other data structure may incur large IPC overhead.
|
|
"""
|
|
n = self.num_workers
|
|
dataset = _EmptyMapDataset(MapDataset(self.dataset, self.mapper))
|
|
loader = build_batch_data_loader(
|
|
dataset, self.sampler, self.total_batch_size, num_workers=n
|
|
)
|
|
self._benchmark(
|
|
iter(loader),
|
|
num_iter * max(n, 1),
|
|
warmup * max(n, 1),
|
|
f"DataLoader ({n} workers, bs={self.per_gpu_batch_size}) w/o comm",
|
|
)
|
|
|
|
def benchmark_distributed(self, num_iter, warmup=10):
|
|
"""
|
|
Benchmark the dataloader in each distributed worker, and log results of
|
|
all workers. This helps understand the final performance as well as
|
|
the variances among workers.
|
|
|
|
It also prints startup time (first iter) of the dataloader.
|
|
"""
|
|
gpu = comm.get_world_size()
|
|
dataset = MapDataset(self.dataset, self.mapper)
|
|
n = self.num_workers
|
|
loader = build_batch_data_loader(
|
|
dataset, self.sampler, self.total_batch_size, num_workers=n
|
|
)
|
|
|
|
timer = Timer()
|
|
loader = iter(loader)
|
|
next(loader)
|
|
startup_time = timer.seconds()
|
|
logger.info("Dataloader startup time: {:.2f} seconds".format(startup_time))
|
|
|
|
comm.synchronize()
|
|
|
|
avg, all_times = self._benchmark(loader, num_iter * max(n, 1), warmup * max(n, 1))
|
|
del loader
|
|
self._log_time(
|
|
f"DataLoader ({gpu} GPUs x {n} workers, total bs={self.total_batch_size})",
|
|
avg,
|
|
all_times,
|
|
True,
|
|
)
|
|
|