Spaces:
Runtime error
Runtime error
File size: 5,099 Bytes
6b59850 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
###############################################################################
#
# Adapted from https://github.com/lrjconan/GRAN/ which in turn is adapted from https://github.com/JiaxuanYou/graph-generation
#
###############################################################################
import pyemd
import numpy as np
import concurrent.futures
from functools import partial
from scipy.linalg import toeplitz
def emd(x, y, distance_scaling=1.0):
support_size = max(len(x), len(y))
d_mat = toeplitz(range(support_size)).astype(float)
distance_mat = d_mat / distance_scaling
# convert histogram values x and y to float, and make them equal len
x = x.astype(float)
y = y.astype(float)
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
emd = pyemd.emd(x, y, distance_mat)
return emd
def l2(x, y):
dist = np.linalg.norm(x - y, 2)
return dist
def emd(x, y, sigma=1.0, distance_scaling=1.0):
''' EMD
Args:
x, y: 1D pmf of two distributions with the same support
sigma: standard deviation
'''
support_size = max(len(x), len(y))
d_mat = toeplitz(range(support_size)).astype(float)
distance_mat = d_mat / distance_scaling
# convert histogram values x and y to float, and make them equal len
x = x.astype(float)
y = y.astype(float)
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
return np.abs(pyemd.emd(x, y, distance_mat))
def gaussian_emd(x, y, sigma=1.0, distance_scaling=1.0):
''' Gaussian kernel with squared distance in exponential term replaced by EMD
Args:
x, y: 1D pmf of two distributions with the same support
sigma: standard deviation
'''
support_size = max(len(x), len(y))
d_mat = toeplitz(range(support_size)).astype(float)
distance_mat = d_mat / distance_scaling
# convert histogram values x and y to float, and make them equal len
x = x.astype(float)
y = y.astype(float)
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
emd = pyemd.emd(x, y, distance_mat)
return np.exp(-emd * emd / (2 * sigma * sigma))
def gaussian(x, y, sigma=1.0):
support_size = max(len(x), len(y))
# convert histogram values x and y to float, and make them equal len
x = x.astype(float)
y = y.astype(float)
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
dist = np.linalg.norm(x - y, 2)
return np.exp(-dist * dist / (2 * sigma * sigma))
def gaussian_tv(x, y, sigma=1.0):
support_size = max(len(x), len(y))
# convert histogram values x and y to float, and make them equal len
x = x.astype(float)
y = y.astype(float)
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
dist = np.abs(x - y).sum() / 2.0
return np.exp(-dist * dist / (2 * sigma * sigma))
def kernel_parallel_unpacked(x, samples2, kernel):
d = 0
for s2 in samples2:
d += kernel(x, s2)
return d
def kernel_parallel_worker(t):
return kernel_parallel_unpacked(*t)
def disc(samples1, samples2, kernel, is_parallel=True, *args, **kwargs):
''' Discrepancy between 2 samples '''
d = 0
if not is_parallel:
for s1 in samples1:
for s2 in samples2:
d += kernel(s1, s2, *args, **kwargs)
else:
with concurrent.futures.ThreadPoolExecutor() as executor:
for dist in executor.map(kernel_parallel_worker, [
(s1, samples2, partial(kernel, *args, **kwargs)) for s1 in samples1
]):
d += dist
if len(samples1) * len(samples2) > 0:
d /= len(samples1) * len(samples2)
else:
d = 1e+6
return d
def compute_mmd(samples1, samples2, kernel, is_hist=True, *args, **kwargs):
''' MMD between two samples '''
# normalize histograms into pmf
if is_hist:
samples1 = [s1 / (np.sum(s1) + 1e-6) for s1 in samples1]
samples2 = [s2 / (np.sum(s2) + 1e-6) for s2 in samples2]
return disc(samples1, samples1, kernel, *args, **kwargs) + disc(samples2, samples2, kernel, *args, **kwargs) - \
2 * disc(samples1, samples2, kernel, *args, **kwargs)
def compute_emd(samples1, samples2, kernel, is_hist=True, *args, **kwargs):
''' EMD between average of two samples '''
# normalize histograms into pmf
if is_hist:
samples1 = [np.mean(samples1)]
samples2 = [np.mean(samples2)]
return disc(samples1, samples2, kernel, *args,
**kwargs), [samples1[0], samples2[0]]
|