Spaces:
Sleeping
Sleeping
""" | |
General purpose Python functions. | |
TODO(vitchyr): I should probably move this to its own module | |
""" | |
import math | |
import sys | |
import collections | |
import itertools | |
def identity(x): | |
return x | |
def clip_magnitude(value, magnitude): | |
""" | |
Clip the magnitude of value to be within some value. | |
:param value: | |
:param magnitude: | |
:return: | |
""" | |
return min(max(value, -magnitude), magnitude) | |
def is_numeric(x): | |
return not isinstance(x, bool) and ( | |
isinstance(x, int) or isinstance(x, float) | |
) | |
""" | |
Dictionary methods | |
""" | |
def dot_map_dict_to_nested_dict(dot_map_dict): | |
""" | |
Convert something like | |
``` | |
{ | |
'one.two.three.four': 4, | |
'one.six.seven.eight': None, | |
'five.nine.ten': 10, | |
'five.zero': 'foo', | |
} | |
``` | |
into its corresponding nested dict. | |
http://stackoverflow.com/questions/16547643/convert-a-list-of-delimited-strings-to-a-tree-nested-dict-using-python | |
:param dot_map_dict: | |
:return: | |
""" | |
tree = {} | |
for key, item in dot_map_dict.items(): | |
split_keys = key.split('.') | |
if len(split_keys) == 1: | |
if key in tree: | |
raise ValueError("Duplicate key: {}".format(key)) | |
tree[key] = item | |
else: | |
t = tree | |
for sub_key in split_keys[:-1]: | |
t = t.setdefault(sub_key, {}) | |
last_key = split_keys[-1] | |
if not isinstance(t, dict): | |
raise TypeError( | |
"Key inside dot map must point to dictionary: {}".format( | |
key | |
) | |
) | |
if last_key in t: | |
raise ValueError("Duplicate key: {}".format(last_key)) | |
t[last_key] = item | |
return tree | |
def nested_dict_to_dot_map_dict(d, parent_key=''): | |
""" | |
Convert a recursive dictionary into a flat, dot-map dictionary. | |
:param d: e.g. {'a': {'b': 2, 'c': 3}} | |
:param parent_key: Used for recursion | |
:return: e.g. {'a.b': 2, 'a.c': 3} | |
""" | |
items = [] | |
for k, v in d.items(): | |
new_key = parent_key + "." + k if parent_key else k | |
if isinstance(v, collections.MutableMapping): | |
items.extend(nested_dict_to_dot_map_dict(v, new_key).items()) | |
else: | |
items.append((new_key, v)) | |
return dict(items) | |
def merge_recursive_dicts(a, b, path=None, | |
ignore_duplicate_keys_in_second_dict=False): | |
""" | |
Merge two dicts that may have nested dicts. | |
""" | |
if path is None: path = [] | |
for key in b: | |
if key in a: | |
if isinstance(a[key], dict) and isinstance(b[key], dict): | |
merge_recursive_dicts(a[key], b[key], path + [str(key)], | |
ignore_duplicate_keys_in_second_dict=ignore_duplicate_keys_in_second_dict) | |
elif a[key] == b[key]: | |
print("Same value for key: {}".format(key)) | |
else: | |
duplicate_key = '.'.join(path + [str(key)]) | |
if ignore_duplicate_keys_in_second_dict: | |
print("duplicate key ignored: {}".format(duplicate_key)) | |
else: | |
raise Exception( | |
'Duplicate keys at {}'.format(duplicate_key) | |
) | |
else: | |
a[key] = b[key] | |
return a | |
def dict_of_list__to__list_of_dicts(dict, n_items): | |
""" | |
``` | |
x = {'foo': [3, 4, 5], 'bar': [1, 2, 3]} | |
ppp.dict_of_list__to__list_of_dicts(x, 3) | |
# Output: | |
# [ | |
# {'foo': 3, 'bar': 1}, | |
# {'foo': 4, 'bar': 2}, | |
# {'foo': 5, 'bar': 3}, | |
# ] | |
``` | |
:param dict: | |
:param n_items: | |
:return: | |
""" | |
new_dicts = [{} for _ in range(n_items)] | |
for key, values in dict.items(): | |
for i in range(n_items): | |
new_dicts[i][key] = values[i] | |
return new_dicts | |
def list_of_dicts__to__dict_of_lists(lst): | |
""" | |
``` | |
x = [ | |
{'foo': 3, 'bar': 1}, | |
{'foo': 4, 'bar': 2}, | |
{'foo': 5, 'bar': 3}, | |
] | |
ppp.list_of_dicts__to__dict_of_lists(x) | |
# Output: | |
# {'foo': [3, 4, 5], 'bar': [1, 2, 3]} | |
``` | |
""" | |
if len(lst) == 0: | |
return {} | |
keys = lst[0].keys() | |
output_dict = collections.defaultdict(list) | |
for d in lst: | |
assert set(d.keys()) == set(keys) | |
for k in keys: | |
output_dict[k].append(d[k]) | |
return output_dict | |
def safe_json(data): | |
if data is None: | |
return True | |
elif isinstance(data, (bool, int, float)): | |
return True | |
elif isinstance(data, (tuple, list)): | |
return all(safe_json(x) for x in data) | |
elif isinstance(data, dict): | |
return all(isinstance(k, str) and safe_json(v) for k, v in data.items()) | |
return False | |
def dict_to_safe_json(d, sort=False): | |
""" | |
Convert each value in the dictionary into a JSON'able primitive. | |
:param d: | |
:return: | |
""" | |
if isinstance(d, collections.OrderedDict): | |
new_d = collections.OrderedDict() | |
else: | |
new_d = {} | |
for key, item in d.items(): | |
if safe_json(item): | |
new_d[key] = item | |
else: | |
if ( | |
isinstance(item, dict) | |
or isinstance(item, collections.OrderedDict) | |
): | |
new_d[key] = dict_to_safe_json(item, sort=sort) | |
else: | |
new_d[key] = str(item) | |
if sort: | |
return collections.OrderedDict(sorted(new_d.items())) | |
else: | |
return new_d | |
def recursive_items(dictionary): | |
""" | |
Get all (key, item) recursively in a potentially recursive dictionary. | |
Usage: | |
``` | |
x = { | |
'foo' : { | |
'bar' : 5 | |
} | |
} | |
recursive_items(x) | |
# output: | |
# ('foo', {'bar' : 5}) | |
# ('bar', 5) | |
``` | |
:param dictionary: | |
:return: | |
""" | |
for key, value in dictionary.items(): | |
yield key, value | |
if type(value) is dict: | |
yield from recursive_items(value) | |
# TODO(vitchyr): test methods/classes below | |
""" | |
Itertools++ | |
""" | |
def map_recursive(fctn, x_or_iterable): | |
""" | |
Apply `fctn` to each element in x_or_iterable. | |
This is a generalization of the map function since this will work | |
recursively for iterables. | |
:param fctn: Function from element of iterable to something. | |
:param x_or_iterable: An element or an Iterable of an element. | |
:return: The same (potentially recursive) iterable but with | |
all the elements transformed by fctn. | |
""" | |
# if isinstance(x_or_iterable, Iterable): | |
if isinstance(x_or_iterable, list) or isinstance(x_or_iterable, tuple): | |
return type(x_or_iterable)( | |
map_recursive(fctn, item) for item in x_or_iterable | |
) | |
else: | |
return fctn(x_or_iterable) | |
def filter_recursive(x_or_iterable): | |
""" | |
Filter out elements that are Falsy (where bool(x) is False) from | |
potentially recursive lists. | |
:param x_or_iterable: An element or a list. | |
:return: If x_or_iterable is not an Iterable, then return x_or_iterable. | |
Otherwise, return a filtered version of x_or_iterable. | |
""" | |
if isinstance(x_or_iterable, list): | |
new_items = [] | |
for sub_elem in x_or_iterable: | |
filtered_sub_elem = filter_recursive(sub_elem) | |
if filtered_sub_elem is not None and not ( | |
isinstance(filtered_sub_elem, list) and | |
len(filtered_sub_elem) == 0 | |
): | |
new_items.append(filtered_sub_elem) | |
return new_items | |
else: | |
return x_or_iterable | |
def batch(iterable, n=1): | |
""" | |
Split an interable into batches of size `n`. If `n` does not evenly divide | |
`iterable`, the last slice will be smaller. | |
https://stackoverflow.com/questions/8290397/how-to-split-an-iterable-in-constant-size-chunks | |
Usage: | |
``` | |
for i in batch(range(0,10), 3): | |
print i | |
[0,1,2] | |
[3,4,5] | |
[6,7,8] | |
[9] | |
``` | |
""" | |
l = len(iterable) | |
for ndx in range(0, l, n): | |
yield iterable[ndx:min(ndx + n, l)] | |
def takespread(sequence, num): | |
""" | |
Get `num` elements from the sequence that are as spread out as possible. | |
https://stackoverflow.com/questions/9873626/choose-m-evenly-spaced-elements-from-a-sequence-of-length-n | |
:param sequence: | |
:param num: | |
:return: | |
""" | |
length = float(len(sequence)) | |
for i in range(num): | |
yield sequence[int(math.ceil(i * length / num))] | |
""" | |
Custom Classes | |
""" | |
class IntIdDict(collections.defaultdict): | |
""" | |
Automatically assign int IDs to hashable objects. | |
Usage: | |
``` | |
id_map = IntIdDict() | |
print(id_map['a']) | |
print(id_map['b']) | |
print(id_map['c']) | |
print(id_map['a']) | |
print(id_map['b']) | |
print(id_map['a']) | |
print('') | |
print(id_map.get_inverse(0)) | |
print(id_map.get_inverse(1)) | |
print(id_map.get_inverse(2)) | |
``` | |
Output: | |
``` | |
1 | |
2 | |
3 | |
1 | |
2 | |
1 | |
'a' | |
'b' | |
'c' | |
``` | |
:return: | |
""" | |
def __init__(self, **kwargs): | |
c = itertools.count() | |
self.inverse_dict = {} | |
super().__init__(lambda: next(c), **kwargs) | |
def __getitem__(self, y): | |
int_id = super().__getitem__(y) | |
self.inverse_dict[int_id] = y | |
return int_id | |
def reverse_id(self, int_id): | |
return self.inverse_dict[int_id] | |
class ConditionTimer(object): | |
""" | |
A timer that goes off after the a fixed time period. | |
The catch: you need to poll it and provide it the time! | |
Usage: | |
``` | |
timer = PollTimer(100) # next check will be true at 100 | |
timer.check(90) # False | |
timer.check(110) # True. Next check will go off at 110 + 100 = 210 | |
timer.check(205) # False | |
timer.check(210) # True | |
``` | |
""" | |
def __init__(self, trigger_period): | |
""" | |
:param trigger_period: If None or 0, `check` will always return False. | |
""" | |
self.last_time_triggered = 0 | |
if trigger_period is None: | |
trigger_period = 0 | |
self.trigger_period = trigger_period | |
def check(self, time): | |
if self.always_false: | |
return False | |
if time - self.last_time_triggered >= self.trigger_period: | |
self.last_time_triggered = time | |
return True | |
else: | |
return False | |
def always_false(self): | |
return self.trigger_period == 0 | |
def string_tuple_to_string(strings): | |
if len(strings) == 0: | |
string = "" | |
elif len(strings) == 1: | |
string = strings[0] | |
else: | |
string = " ".join([str(s) for s in strings]) | |
return string | |
class _Logger(object): | |
def __init__(self): | |
self.n_chars = 0 | |
self.lines = [] | |
def print_over(self, *strings): | |
""" | |
Remove anything printed in the last printover call. Then print `string` | |
""" | |
string = string_tuple_to_string(strings) | |
sys.stdout.write("\r" * self.n_chars) | |
sys.stdout.write(string) | |
sys.stdout.flush() | |
self.n_chars = len(string) | |
def newline(self): | |
sys.stdout.write("\n") | |
sys.stdout.flush() | |
self.n_chars = 0 | |
line_logger = _Logger() | |
def find_key_recursive(obj, key): | |
if key in obj: | |
return obj[key] | |
for k, v in obj.items(): | |
if isinstance(v, dict): | |
result = find_key_recursive(v, key) | |
if result is not None: | |
return result |