Spaces:
Running
on
Zero
Running
on
Zero
# https://github.com/XPixelGroup/BasicSR/blob/master/basicsr/data/degradations.py | |
# Copyright (c) OpenMMLab. All rights reserved. | |
# https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py | |
import math | |
import random | |
import re | |
from abc import ABCMeta, abstractmethod | |
from pathlib import Path | |
from typing import List, Dict | |
from typing import Mapping, Any | |
from typing import Optional, Union | |
import cv2 | |
import numpy as np | |
import torch | |
from PIL import Image | |
from scipy import special | |
from scipy.stats import multivariate_normal | |
from torch import Tensor | |
# from torchvision.transforms.functional_tensor import rgb_to_grayscale | |
from torchvision.transforms._functional_tensor import rgb_to_grayscale | |
# -------------------------------------------------------------------- # | |
# --------------------------- blur kernels --------------------------- # | |
# -------------------------------------------------------------------- # | |
# --------------------------- util functions --------------------------- # | |
def sigma_matrix2(sig_x, sig_y, theta): | |
"""Calculate the rotated sigma matrix (two dimensional matrix). | |
Args: | |
sig_x (float): | |
sig_y (float): | |
theta (float): Radian measurement. | |
Returns: | |
ndarray: Rotated sigma matrix. | |
""" | |
d_matrix = np.array([[sig_x ** 2, 0], [0, sig_y ** 2]]) | |
u_matrix = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) | |
return np.dot(u_matrix, np.dot(d_matrix, u_matrix.T)) | |
def mesh_grid(kernel_size): | |
"""Generate the mesh grid, centering at zero. | |
Args: | |
kernel_size (int): | |
Returns: | |
xy (ndarray): with the shape (kernel_size, kernel_size, 2) | |
xx (ndarray): with the shape (kernel_size, kernel_size) | |
yy (ndarray): with the shape (kernel_size, kernel_size) | |
""" | |
ax = np.arange(-kernel_size // 2 + 1., kernel_size // 2 + 1.) | |
xx, yy = np.meshgrid(ax, ax) | |
xy = np.hstack((xx.reshape((kernel_size * kernel_size, 1)), yy.reshape(kernel_size * kernel_size, | |
1))).reshape(kernel_size, kernel_size, 2) | |
return xy, xx, yy | |
def pdf2(sigma_matrix, grid): | |
"""Calculate PDF of the bivariate Gaussian distribution. | |
Args: | |
sigma_matrix (ndarray): with the shape (2, 2) | |
grid (ndarray): generated by :func:`mesh_grid`, | |
with the shape (K, K, 2), K is the kernel size. | |
Returns: | |
kernel (ndarrray): un-normalized kernel. | |
""" | |
inverse_sigma = np.linalg.inv(sigma_matrix) | |
kernel = np.exp(-0.5 * np.sum(np.dot(grid, inverse_sigma) * grid, 2)) | |
return kernel | |
def cdf2(d_matrix, grid): | |
"""Calculate the CDF of the standard bivariate Gaussian distribution. | |
Used in skewed Gaussian distribution. | |
Args: | |
d_matrix (ndarrasy): skew matrix. | |
grid (ndarray): generated by :func:`mesh_grid`, | |
with the shape (K, K, 2), K is the kernel size. | |
Returns: | |
cdf (ndarray): skewed cdf. | |
""" | |
rv = multivariate_normal([0, 0], [[1, 0], [0, 1]]) | |
grid = np.dot(grid, d_matrix) | |
cdf = rv.cdf(grid) | |
return cdf | |
def bivariate_Gaussian(kernel_size, sig_x, sig_y, theta, grid=None, isotropic=True): | |
"""Generate a bivariate isotropic or anisotropic Gaussian kernel. | |
In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. | |
Args: | |
kernel_size (int): | |
sig_x (float): | |
sig_y (float): | |
theta (float): Radian measurement. | |
grid (ndarray, optional): generated by :func:`mesh_grid`, | |
with the shape (K, K, 2), K is the kernel size. Default: None | |
isotropic (bool): | |
Returns: | |
kernel (ndarray): normalized kernel. | |
""" | |
if grid is None: | |
grid, _, _ = mesh_grid(kernel_size) | |
if isotropic: | |
sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) | |
else: | |
sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) | |
kernel = pdf2(sigma_matrix, grid) | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def bivariate_generalized_Gaussian(kernel_size, sig_x, sig_y, theta, beta, grid=None, isotropic=True): | |
"""Generate a bivariate generalized Gaussian kernel. | |
``Paper: Parameter Estimation For Multivariate Generalized Gaussian Distributions`` | |
In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. | |
Args: | |
kernel_size (int): | |
sig_x (float): | |
sig_y (float): | |
theta (float): Radian measurement. | |
beta (float): shape parameter, beta = 1 is the normal distribution. | |
grid (ndarray, optional): generated by :func:`mesh_grid`, | |
with the shape (K, K, 2), K is the kernel size. Default: None | |
Returns: | |
kernel (ndarray): normalized kernel. | |
""" | |
if grid is None: | |
grid, _, _ = mesh_grid(kernel_size) | |
if isotropic: | |
sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) | |
else: | |
sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) | |
inverse_sigma = np.linalg.inv(sigma_matrix) | |
kernel = np.exp(-0.5 * np.power(np.sum(np.dot(grid, inverse_sigma) * grid, 2), beta)) | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def bivariate_plateau(kernel_size, sig_x, sig_y, theta, beta, grid=None, isotropic=True): | |
"""Generate a plateau-like anisotropic kernel. | |
1 / (1+x^(beta)) | |
Reference: https://stats.stackexchange.com/questions/203629/is-there-a-plateau-shaped-distribution | |
In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. | |
Args: | |
kernel_size (int): | |
sig_x (float): | |
sig_y (float): | |
theta (float): Radian measurement. | |
beta (float): shape parameter, beta = 1 is the normal distribution. | |
grid (ndarray, optional): generated by :func:`mesh_grid`, | |
with the shape (K, K, 2), K is the kernel size. Default: None | |
Returns: | |
kernel (ndarray): normalized kernel. | |
""" | |
if grid is None: | |
grid, _, _ = mesh_grid(kernel_size) | |
if isotropic: | |
sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) | |
else: | |
sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) | |
inverse_sigma = np.linalg.inv(sigma_matrix) | |
kernel = np.reciprocal(np.power(np.sum(np.dot(grid, inverse_sigma) * grid, 2), beta) + 1) | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def random_bivariate_Gaussian(kernel_size, | |
sigma_x_range, | |
sigma_y_range, | |
rotation_range, | |
noise_range=None, | |
isotropic=True): | |
"""Randomly generate bivariate isotropic or anisotropic Gaussian kernels. | |
In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. | |
Args: | |
kernel_size (int): | |
sigma_x_range (tuple): [0.6, 5] | |
sigma_y_range (tuple): [0.6, 5] | |
rotation range (tuple): [-math.pi, math.pi] | |
noise_range(tuple, optional): multiplicative kernel noise, | |
[0.75, 1.25]. Default: None | |
Returns: | |
kernel (ndarray): | |
""" | |
assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' | |
assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' | |
sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) | |
if isotropic is False: | |
assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' | |
assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' | |
sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) | |
rotation = np.random.uniform(rotation_range[0], rotation_range[1]) | |
else: | |
sigma_y = sigma_x | |
rotation = 0 | |
kernel = bivariate_Gaussian(kernel_size, sigma_x, sigma_y, rotation, isotropic=isotropic) | |
# add multiplicative noise | |
if noise_range is not None: | |
assert noise_range[0] < noise_range[1], 'Wrong noise range.' | |
noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) | |
kernel = kernel * noise | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def random_bivariate_generalized_Gaussian(kernel_size, | |
sigma_x_range, | |
sigma_y_range, | |
rotation_range, | |
beta_range, | |
noise_range=None, | |
isotropic=True): | |
"""Randomly generate bivariate generalized Gaussian kernels. | |
In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. | |
Args: | |
kernel_size (int): | |
sigma_x_range (tuple): [0.6, 5] | |
sigma_y_range (tuple): [0.6, 5] | |
rotation range (tuple): [-math.pi, math.pi] | |
beta_range (tuple): [0.5, 8] | |
noise_range(tuple, optional): multiplicative kernel noise, | |
[0.75, 1.25]. Default: None | |
Returns: | |
kernel (ndarray): | |
""" | |
assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' | |
assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' | |
sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) | |
if isotropic is False: | |
assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' | |
assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' | |
sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) | |
rotation = np.random.uniform(rotation_range[0], rotation_range[1]) | |
else: | |
sigma_y = sigma_x | |
rotation = 0 | |
# assume beta_range[0] < 1 < beta_range[1] | |
if np.random.uniform() < 0.5: | |
beta = np.random.uniform(beta_range[0], 1) | |
else: | |
beta = np.random.uniform(1, beta_range[1]) | |
kernel = bivariate_generalized_Gaussian(kernel_size, sigma_x, sigma_y, rotation, beta, isotropic=isotropic) | |
# add multiplicative noise | |
if noise_range is not None: | |
assert noise_range[0] < noise_range[1], 'Wrong noise range.' | |
noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) | |
kernel = kernel * noise | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def random_bivariate_plateau(kernel_size, | |
sigma_x_range, | |
sigma_y_range, | |
rotation_range, | |
beta_range, | |
noise_range=None, | |
isotropic=True): | |
"""Randomly generate bivariate plateau kernels. | |
In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. | |
Args: | |
kernel_size (int): | |
sigma_x_range (tuple): [0.6, 5] | |
sigma_y_range (tuple): [0.6, 5] | |
rotation range (tuple): [-math.pi/2, math.pi/2] | |
beta_range (tuple): [1, 4] | |
noise_range(tuple, optional): multiplicative kernel noise, | |
[0.75, 1.25]. Default: None | |
Returns: | |
kernel (ndarray): | |
""" | |
assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' | |
assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' | |
sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) | |
if isotropic is False: | |
assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' | |
assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' | |
sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) | |
rotation = np.random.uniform(rotation_range[0], rotation_range[1]) | |
else: | |
sigma_y = sigma_x | |
rotation = 0 | |
# TODO: this may be not proper | |
if np.random.uniform() < 0.5: | |
beta = np.random.uniform(beta_range[0], 1) | |
else: | |
beta = np.random.uniform(1, beta_range[1]) | |
kernel = bivariate_plateau(kernel_size, sigma_x, sigma_y, rotation, beta, isotropic=isotropic) | |
# add multiplicative noise | |
if noise_range is not None: | |
assert noise_range[0] < noise_range[1], 'Wrong noise range.' | |
noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) | |
kernel = kernel * noise | |
kernel = kernel / np.sum(kernel) | |
return kernel | |
def random_mixed_kernels(kernel_list, | |
kernel_prob, | |
kernel_size=21, | |
sigma_x_range=(0.6, 5), | |
sigma_y_range=(0.6, 5), | |
rotation_range=(-math.pi, math.pi), | |
betag_range=(0.5, 8), | |
betap_range=(0.5, 8), | |
noise_range=None): | |
"""Randomly generate mixed kernels. | |
Args: | |
kernel_list (tuple): a list name of kernel types, | |
support ['iso', 'aniso', 'skew', 'generalized', 'plateau_iso', | |
'plateau_aniso'] | |
kernel_prob (tuple): corresponding kernel probability for each | |
kernel type | |
kernel_size (int): | |
sigma_x_range (tuple): [0.6, 5] | |
sigma_y_range (tuple): [0.6, 5] | |
rotation range (tuple): [-math.pi, math.pi] | |
beta_range (tuple): [0.5, 8] | |
noise_range(tuple, optional): multiplicative kernel noise, | |
[0.75, 1.25]. Default: None | |
Returns: | |
kernel (ndarray): | |
""" | |
kernel_type = random.choices(kernel_list, kernel_prob)[0] | |
if kernel_type == 'iso': | |
kernel = random_bivariate_Gaussian( | |
kernel_size, sigma_x_range, sigma_y_range, rotation_range, noise_range=noise_range, isotropic=True) | |
elif kernel_type == 'aniso': | |
kernel = random_bivariate_Gaussian( | |
kernel_size, sigma_x_range, sigma_y_range, rotation_range, noise_range=noise_range, isotropic=False) | |
elif kernel_type == 'generalized_iso': | |
kernel = random_bivariate_generalized_Gaussian( | |
kernel_size, | |
sigma_x_range, | |
sigma_y_range, | |
rotation_range, | |
betag_range, | |
noise_range=noise_range, | |
isotropic=True) | |
elif kernel_type == 'generalized_aniso': | |
kernel = random_bivariate_generalized_Gaussian( | |
kernel_size, | |
sigma_x_range, | |
sigma_y_range, | |
rotation_range, | |
betag_range, | |
noise_range=noise_range, | |
isotropic=False) | |
elif kernel_type == 'plateau_iso': | |
kernel = random_bivariate_plateau( | |
kernel_size, sigma_x_range, sigma_y_range, rotation_range, betap_range, noise_range=None, isotropic=True) | |
elif kernel_type == 'plateau_aniso': | |
kernel = random_bivariate_plateau( | |
kernel_size, sigma_x_range, sigma_y_range, rotation_range, betap_range, noise_range=None, isotropic=False) | |
return kernel | |
np.seterr(divide='ignore', invalid='ignore') | |
def circular_lowpass_kernel(cutoff, kernel_size, pad_to=0): | |
"""2D sinc filter | |
Reference: https://dsp.stackexchange.com/questions/58301/2-d-circularly-symmetric-low-pass-filter | |
Args: | |
cutoff (float): cutoff frequency in radians (pi is max) | |
kernel_size (int): horizontal and vertical size, must be odd. | |
pad_to (int): pad kernel size to desired size, must be odd or zero. | |
""" | |
assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' | |
kernel = np.fromfunction( | |
lambda x, y: cutoff * special.j1(cutoff * np.sqrt( | |
(x - (kernel_size - 1) / 2) ** 2 + (y - (kernel_size - 1) / 2) ** 2)) / (2 * np.pi * np.sqrt( | |
(x - (kernel_size - 1) / 2) ** 2 + (y - (kernel_size - 1) / 2) ** 2)), [kernel_size, kernel_size]) | |
kernel[(kernel_size - 1) // 2, (kernel_size - 1) // 2] = cutoff ** 2 / (4 * np.pi) | |
kernel = kernel / np.sum(kernel) | |
if pad_to > kernel_size: | |
pad_size = (pad_to - kernel_size) // 2 | |
kernel = np.pad(kernel, ((pad_size, pad_size), (pad_size, pad_size))) | |
return kernel | |
# ------------------------------------------------------------- # | |
# --------------------------- noise --------------------------- # | |
# ------------------------------------------------------------- # | |
# ----------------------- Gaussian Noise ----------------------- # | |
def instantiate_from_config(config: Mapping[str, Any]) -> Any: | |
if not "target" in config: | |
raise KeyError("Expected key `target` to instantiate.") | |
return get_obj_from_str(config["target"])(**config.get("params", dict())) | |
class BaseStorageBackend(metaclass=ABCMeta): | |
"""Abstract class of storage backends. | |
All backends need to implement two apis: ``get()`` and ``get_text()``. | |
``get()`` reads the file as a byte stream and ``get_text()`` reads the file | |
as texts. | |
""" | |
def name(self) -> str: | |
return self.__class__.__name__ | |
def get(self, filepath: str) -> bytes: | |
pass | |
class PetrelBackend(BaseStorageBackend): | |
"""Petrel storage backend (for internal use). | |
PetrelBackend supports reading and writing data to multiple clusters. | |
If the file path contains the cluster name, PetrelBackend will read data | |
from specified cluster or write data to it. Otherwise, PetrelBackend will | |
access the default cluster. | |
Args: | |
path_mapping (dict, optional): Path mapping dict from local path to | |
Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in | |
``filepath`` will be replaced by ``dst``. Default: None. | |
enable_mc (bool, optional): Whether to enable memcached support. | |
Default: True. | |
conf_path (str, optional): Config path of Petrel client. Default: None. | |
`New in version 1.7.1`. | |
Examples: | |
>>> filepath1 = 's3://path/of/file' | |
>>> filepath2 = 'cluster-name:s3://path/of/file' | |
>>> client = PetrelBackend() | |
>>> client.get(filepath1) # get data from default cluster | |
>>> client.get(filepath2) # get data from 'cluster-name' cluster | |
""" | |
def __init__(self, | |
path_mapping: Optional[dict] = None, | |
enable_mc: bool = False, | |
conf_path: str = None): | |
try: | |
from petrel_client import client | |
except ImportError: | |
raise ImportError('Please install petrel_client to enable ' | |
'PetrelBackend.') | |
self._client = client.Client(conf_path=conf_path, enable_mc=enable_mc) | |
assert isinstance(path_mapping, dict) or path_mapping is None | |
self.path_mapping = path_mapping | |
def _map_path(self, filepath: Union[str, Path]) -> str: | |
"""Map ``filepath`` to a string path whose prefix will be replaced by | |
:attr:`self.path_mapping`. | |
Args: | |
filepath (str): Path to be mapped. | |
""" | |
filepath = str(filepath) | |
if self.path_mapping is not None: | |
for k, v in self.path_mapping.items(): | |
filepath = filepath.replace(k, v, 1) | |
return filepath | |
def _format_path(self, filepath: str) -> str: | |
"""Convert a ``filepath`` to standard format of petrel oss. | |
If the ``filepath`` is concatenated by ``os.path.join``, in a Windows | |
environment, the ``filepath`` will be the format of | |
's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the | |
above ``filepath`` will be converted to 's3://bucket_name/image.jpg'. | |
Args: | |
filepath (str): Path to be formatted. | |
""" | |
return re.sub(r'\\+', '/', filepath) | |
def get(self, filepath: Union[str, Path]) -> bytes: | |
"""Read data from a given ``filepath`` with 'rb' mode. | |
Args: | |
filepath (str or Path): Path to read data. | |
Returns: | |
bytes: The loaded bytes. | |
""" | |
filepath = self._map_path(filepath) | |
filepath = self._format_path(filepath) | |
value = self._client.Get(filepath) | |
return value | |
class HardDiskBackend(BaseStorageBackend): | |
"""Raw hard disks storage backend.""" | |
def get(self, filepath: Union[str, Path]) -> bytes: | |
"""Read data from a given ``filepath`` with 'rb' mode. | |
Args: | |
filepath (str or Path): Path to read data. | |
Returns: | |
bytes: Expected bytes object. | |
""" | |
with open(filepath, 'rb') as f: | |
value_buf = f.read() | |
return value_buf | |
def generate_gaussian_noise(img, sigma=10, gray_noise=False): | |
"""Generate Gaussian noise. | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
sigma (float): Noise scale (measured in range 255). Default: 10. | |
Returns: | |
(Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
if gray_noise: | |
noise = np.float32(np.random.randn(*(img.shape[0:2]))) * sigma / 255. | |
noise = np.expand_dims(noise, axis=2).repeat(3, axis=2) | |
else: | |
noise = np.float32(np.random.randn(*(img.shape))) * sigma / 255. | |
return noise | |
def add_gaussian_noise(img, sigma=10, clip=True, rounds=False, gray_noise=False): | |
"""Add Gaussian noise. | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
sigma (float): Noise scale (measured in range 255). Default: 10. | |
Returns: | |
(Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
noise = generate_gaussian_noise(img, sigma, gray_noise) | |
out = img + noise | |
if clip and rounds: | |
out = np.clip((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = np.clip(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
def generate_gaussian_noise_pt(img, sigma=10, gray_noise=0): | |
"""Add Gaussian noise (PyTorch version). | |
Args: | |
img (Tensor): Shape (b, c, h, w), range[0, 1], float32. | |
scale (float | Tensor): Noise scale. Default: 1.0. | |
Returns: | |
(Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], | |
float32. | |
""" | |
b, _, h, w = img.size() | |
if not isinstance(sigma, (float, int)): | |
sigma = sigma.view(img.size(0), 1, 1, 1) | |
if isinstance(gray_noise, (float, int)): | |
cal_gray_noise = gray_noise > 0 | |
else: | |
gray_noise = gray_noise.view(b, 1, 1, 1) | |
cal_gray_noise = torch.sum(gray_noise) > 0 | |
if cal_gray_noise: | |
noise_gray = torch.randn(*img.size()[2:4], dtype=img.dtype, device=img.device) * sigma / 255. | |
noise_gray = noise_gray.view(b, 1, h, w) | |
# always calculate color noise | |
noise = torch.randn(*img.size(), dtype=img.dtype, device=img.device) * sigma / 255. | |
if cal_gray_noise: | |
noise = noise * (1 - gray_noise) + noise_gray * gray_noise | |
return noise | |
def add_gaussian_noise_pt(img, sigma=10, gray_noise=0, clip=True, rounds=False): | |
"""Add Gaussian noise (PyTorch version). | |
Args: | |
img (Tensor): Shape (b, c, h, w), range[0, 1], float32. | |
scale (float | Tensor): Noise scale. Default: 1.0. | |
Returns: | |
(Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], | |
float32. | |
""" | |
noise = generate_gaussian_noise_pt(img, sigma, gray_noise) | |
out = img + noise | |
if clip and rounds: | |
out = torch.clamp((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = torch.clamp(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
# ----------------------- Random Gaussian Noise ----------------------- # | |
def random_generate_gaussian_noise(img, sigma_range=(0, 10), gray_prob=0): | |
sigma = np.random.uniform(sigma_range[0], sigma_range[1]) | |
if np.random.uniform() < gray_prob: | |
gray_noise = True | |
else: | |
gray_noise = False | |
return generate_gaussian_noise(img, sigma, gray_noise) | |
def random_add_gaussian_noise(img, sigma_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): | |
noise = random_generate_gaussian_noise(img, sigma_range, gray_prob) | |
out = img + noise | |
if clip and rounds: | |
out = np.clip((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = np.clip(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
def random_generate_gaussian_noise_pt(img, sigma_range=(0, 10), gray_prob=0): | |
sigma = torch.rand( | |
img.size(0), dtype=img.dtype, device=img.device) * (sigma_range[1] - sigma_range[0]) + sigma_range[0] | |
gray_noise = torch.rand(img.size(0), dtype=img.dtype, device=img.device) | |
gray_noise = (gray_noise < gray_prob).float() | |
return generate_gaussian_noise_pt(img, sigma, gray_noise) | |
def random_add_gaussian_noise_pt(img, sigma_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): | |
noise = random_generate_gaussian_noise_pt(img, sigma_range, gray_prob) | |
out = img + noise | |
if clip and rounds: | |
out = torch.clamp((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = torch.clamp(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
# ----------------------- Poisson (Shot) Noise ----------------------- # | |
def generate_poisson_noise(img, scale=1.0, gray_noise=False): | |
"""Generate poisson noise. | |
Reference: https://github.com/scikit-image/scikit-image/blob/main/skimage/util/noise.py#L37-L219 | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
scale (float): Noise scale. Default: 1.0. | |
gray_noise (bool): Whether generate gray noise. Default: False. | |
Returns: | |
(Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
if gray_noise: | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
# round and clip image for counting vals correctly | |
img = np.clip((img * 255.0).round(), 0, 255) / 255. | |
vals = len(np.unique(img)) | |
vals = 2 ** np.ceil(np.log2(vals)) | |
out = np.float32(np.random.poisson(img * vals) / float(vals)) | |
noise = out - img | |
if gray_noise: | |
noise = np.repeat(noise[:, :, np.newaxis], 3, axis=2) | |
return noise * scale | |
def add_poisson_noise(img, scale=1.0, clip=True, rounds=False, gray_noise=False): | |
"""Add poisson noise. | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
scale (float): Noise scale. Default: 1.0. | |
gray_noise (bool): Whether generate gray noise. Default: False. | |
Returns: | |
(Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
noise = generate_poisson_noise(img, scale, gray_noise) | |
out = img + noise | |
if clip and rounds: | |
out = np.clip((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = np.clip(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
def generate_poisson_noise_pt(img, scale=1.0, gray_noise=0): | |
"""Generate a batch of poisson noise (PyTorch version) | |
Args: | |
img (Tensor): Input image, shape (b, c, h, w), range [0, 1], float32. | |
scale (float | Tensor): Noise scale. Number or Tensor with shape (b). | |
Default: 1.0. | |
gray_noise (float | Tensor): 0-1 number or Tensor with shape (b). | |
0 for False, 1 for True. Default: 0. | |
Returns: | |
(Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], | |
float32. | |
""" | |
b, _, h, w = img.size() | |
if isinstance(gray_noise, (float, int)): | |
cal_gray_noise = gray_noise > 0 | |
else: | |
gray_noise = gray_noise.view(b, 1, 1, 1) | |
cal_gray_noise = torch.sum(gray_noise) > 0 | |
if cal_gray_noise: | |
img_gray = rgb_to_grayscale(img, num_output_channels=1) | |
# round and clip image for counting vals correctly | |
img_gray = torch.clamp((img_gray * 255.0).round(), 0, 255) / 255. | |
# use for-loop to get the unique values for each sample | |
vals_list = [len(torch.unique(img_gray[i, :, :, :])) for i in range(b)] | |
vals_list = [2 ** np.ceil(np.log2(vals)) for vals in vals_list] | |
vals = img_gray.new_tensor(vals_list).view(b, 1, 1, 1) | |
out = torch.poisson(img_gray * vals) / vals | |
noise_gray = out - img_gray | |
noise_gray = noise_gray.expand(b, 3, h, w) | |
# always calculate color noise | |
# round and clip image for counting vals correctly | |
img = torch.clamp((img * 255.0).round(), 0, 255) / 255. | |
# use for-loop to get the unique values for each sample | |
vals_list = [len(torch.unique(img[i, :, :, :])) for i in range(b)] | |
vals_list = [2 ** np.ceil(np.log2(vals)) for vals in vals_list] | |
vals = img.new_tensor(vals_list).view(b, 1, 1, 1) | |
out = torch.poisson(img * vals) / vals | |
noise = out - img | |
if cal_gray_noise: | |
noise = noise * (1 - gray_noise) + noise_gray * gray_noise | |
if not isinstance(scale, (float, int)): | |
scale = scale.view(b, 1, 1, 1) | |
return noise * scale | |
def add_poisson_noise_pt(img, scale=1.0, clip=True, rounds=False, gray_noise=0): | |
"""Add poisson noise to a batch of images (PyTorch version). | |
Args: | |
img (Tensor): Input image, shape (b, c, h, w), range [0, 1], float32. | |
scale (float | Tensor): Noise scale. Number or Tensor with shape (b). | |
Default: 1.0. | |
gray_noise (float | Tensor): 0-1 number or Tensor with shape (b). | |
0 for False, 1 for True. Default: 0. | |
Returns: | |
(Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], | |
float32. | |
""" | |
noise = generate_poisson_noise_pt(img, scale, gray_noise) | |
out = img + noise | |
if clip and rounds: | |
out = torch.clamp((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = torch.clamp(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
# ----------------------- Random Poisson (Shot) Noise ----------------------- # | |
def random_generate_poisson_noise(img, scale_range=(0, 1.0), gray_prob=0): | |
scale = np.random.uniform(scale_range[0], scale_range[1]) | |
if np.random.uniform() < gray_prob: | |
gray_noise = True | |
else: | |
gray_noise = False | |
return generate_poisson_noise(img, scale, gray_noise) | |
def random_add_poisson_noise(img, scale_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): | |
noise = random_generate_poisson_noise(img, scale_range, gray_prob) | |
out = img + noise | |
if clip and rounds: | |
out = np.clip((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = np.clip(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
def random_generate_poisson_noise_pt(img, scale_range=(0, 1.0), gray_prob=0): | |
scale = torch.rand( | |
img.size(0), dtype=img.dtype, device=img.device) * (scale_range[1] - scale_range[0]) + scale_range[0] | |
gray_noise = torch.rand(img.size(0), dtype=img.dtype, device=img.device) | |
gray_noise = (gray_noise < gray_prob).float() | |
return generate_poisson_noise_pt(img, scale, gray_noise) | |
def random_add_poisson_noise_pt(img, scale_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): | |
noise = random_generate_poisson_noise_pt(img, scale_range, gray_prob) | |
out = img + noise | |
if clip and rounds: | |
out = torch.clamp((out * 255.0).round(), 0, 255) / 255. | |
elif clip: | |
out = torch.clamp(out, 0, 1) | |
elif rounds: | |
out = (out * 255.0).round() / 255. | |
return out | |
# ------------------------------------------------------------------------ # | |
# --------------------------- JPEG compression --------------------------- # | |
# ------------------------------------------------------------------------ # | |
def add_jpg_compression(img, quality=90): | |
"""Add JPG compression artifacts. | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
quality (float): JPG compression quality. 0 for lowest quality, 100 for | |
best quality. Default: 90. | |
Returns: | |
(Numpy array): Returned image after JPG, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
img = np.clip(img, 0, 1) | |
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] | |
_, encimg = cv2.imencode('.jpg', img * 255., encode_param) | |
img = np.float32(cv2.imdecode(encimg, 1)) / 255. | |
return img | |
def random_add_jpg_compression(img, quality_range=(90, 100)): | |
"""Randomly add JPG compression artifacts. | |
Args: | |
img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. | |
quality_range (tuple[float] | list[float]): JPG compression quality | |
range. 0 for lowest quality, 100 for best quality. | |
Default: (90, 100). | |
Returns: | |
(Numpy array): Returned image after JPG, shape (h, w, c), range[0, 1], | |
float32. | |
""" | |
quality = np.random.uniform(quality_range[0], quality_range[1]) | |
return add_jpg_compression(img, int(quality)) | |
def load_file_list(file_list_path: str) -> List[Dict[str, str]]: | |
files = [] | |
with open(file_list_path, "r") as fin: | |
for line in fin: | |
path = line.strip() | |
if path: | |
files.append({"image_path": path, "prompt": ""}) | |
return files | |
# https://github.com/openai/guided-diffusion/blob/main/guided_diffusion/image_datasets.py | |
def center_crop_arr(pil_image, image_size): | |
# We are not on a new enough PIL to support the `reducing_gap` | |
# argument, which uses BOX downsampling at powers of two first. | |
# Thus, we do it by hand to improve downsample quality. | |
while min(*pil_image.size) >= 2 * image_size: | |
pil_image = pil_image.resize( | |
tuple(x // 2 for x in pil_image.size), resample=Image.BOX | |
) | |
scale = image_size / min(*pil_image.size) | |
pil_image = pil_image.resize( | |
tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC | |
) | |
arr = np.array(pil_image) | |
crop_y = (arr.shape[0] - image_size) // 2 | |
crop_x = (arr.shape[1] - image_size) // 2 | |
return arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size] | |
# https://github.com/openai/guided-diffusion/blob/main/guided_diffusion/image_datasets.py | |
def random_crop_arr(pil_image, image_size, min_crop_frac=0.8, max_crop_frac=1.0): | |
min_smaller_dim_size = math.ceil(image_size / max_crop_frac) | |
max_smaller_dim_size = math.ceil(image_size / min_crop_frac) | |
smaller_dim_size = random.randrange(min_smaller_dim_size, max_smaller_dim_size + 1) | |
# We are not on a new enough PIL to support the `reducing_gap` | |
# argument, which uses BOX downsampling at powers of two first. | |
# Thus, we do it by hand to improve downsample quality. | |
while min(*pil_image.size) >= 2 * smaller_dim_size: | |
pil_image = pil_image.resize( | |
tuple(x // 2 for x in pil_image.size), resample=Image.BOX | |
) | |
scale = smaller_dim_size / min(*pil_image.size) | |
pil_image = pil_image.resize( | |
tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC | |
) | |
arr = np.array(pil_image) | |
crop_y = random.randrange(arr.shape[0] - image_size + 1) | |
crop_x = random.randrange(arr.shape[1] - image_size + 1) | |
return arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size] | |