Luuu / lydorn_utils /async_utils.py
็™ฝ้นญๅ…ˆ็”Ÿ
init
abd2a81
import multiprocessing
import json
import time
def async_func_wrapper(async_func, out_queue):
while True:
if not out_queue.empty():
data = out_queue.get()
if data is not None:
async_func(data)
else:
break
class AsyncMultiprocess(object):
def __init__(self, async_func, num_workers=1):
"""
:param async_func: Takes a queue as input. Listens to the queue and perform operations
on queue elements as long as elements are not None. Stops at the first None element encountered.
:param num_workers:
"""
assert 0 < num_workers, "num_workers should be at least 1."
self.num_workers = num_workers
self.queues = [multiprocessing.Queue() for _ in range(self.num_workers)]
self.subprocesses = [
multiprocessing.Process(target=async_func_wrapper, args=(async_func, self.queues[i])) for i in range(self.num_workers)
]
self.current_process = 0
def start(self):
for subprocess in self.subprocesses:
subprocess.start()
def add_work(self, data):
# TODO: add work to the least busy process (shortest queue)
self.queues[self.current_process].put(data)
self.current_process = (self.current_process + 1) % self.num_workers
def join(self):
for subprocess, queue in zip(self.subprocesses, self.queues):
queue.put(None)
subprocess.join()
class Async(object):
def __init__(self, async_func):
"""
:param async_func: Takes a queue as input. Listens to the queue and perform operations
on queue elements as long as elements are not None. Stops at the first None element encountered.
"""
self.queue = multiprocessing.Queue()
self.subprocess = multiprocessing.Process(target=async_func_wrapper, args=(async_func, self.queue))
def start(self):
self.subprocess.start()
def add_work(self, data):
self.queue.put(data)
def join(self):
self.queue.put(None)
self.subprocess.join()
def main():
def process(data):
print("--- process() ---")
data["out_numbers"] = [number * number for number in data["in_numbers"]]
time.sleep(0.5)
return data
def save(data):
print("--- save() ---")
time.sleep(1)
print("Finished saving")
num_workers = 8
data_list = [
{
"filepath": "out/data.{}.json".format(i),
"in_numbers": list(range(1000))
} for i in range(5)
]
# AsyncMultiprocess
print("AsyncMultiprocess")
saver_async_multiprocess = AsyncMultiprocess(save, num_workers)
saver_async_multiprocess.start()
t0 = time.time()
for data in data_list:
data = process(data)
saver_async_multiprocess.add_work(data)
saver_async_multiprocess.join()
print("Done in {}s".format(time.time() - t0))
print("Finished all!")
if __name__ == "__main__":
main()