File size: 3,054 Bytes
abd2a81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import multiprocessing
import json
import time


def async_func_wrapper(async_func, out_queue):
    while True:
        if not out_queue.empty():
            data = out_queue.get()
            if data is not None:
                async_func(data)
            else:
                break


class AsyncMultiprocess(object):
    def __init__(self, async_func, num_workers=1):
        """

        :param async_func: Takes a queue as input. Listens to the queue and perform operations
        on queue elements as long as elements are not None. Stops at the first None element encountered.
        :param num_workers:
        """
        assert 0 < num_workers, "num_workers should be at least 1."
        self.num_workers = num_workers
        self.queues = [multiprocessing.Queue() for _ in range(self.num_workers)]
        self.subprocesses = [
            multiprocessing.Process(target=async_func_wrapper, args=(async_func, self.queues[i])) for i in range(self.num_workers)
        ]

        self.current_process = 0

    def start(self):
        for subprocess in self.subprocesses:
            subprocess.start()

    def add_work(self, data):
        # TODO: add work to the least busy process (shortest queue)
        self.queues[self.current_process].put(data)
        self.current_process = (self.current_process + 1) % self.num_workers

    def join(self):
        for subprocess, queue in zip(self.subprocesses, self.queues):
            queue.put(None)
            subprocess.join()


class Async(object):
    def __init__(self, async_func):
        """

        :param async_func: Takes a queue as input. Listens to the queue and perform operations
        on queue elements as long as elements are not None. Stops at the first None element encountered.
        """
        self.queue = multiprocessing.Queue()
        self.subprocess = multiprocessing.Process(target=async_func_wrapper, args=(async_func, self.queue))

    def start(self):
        self.subprocess.start()

    def add_work(self, data):
        self.queue.put(data)

    def join(self):
        self.queue.put(None)
        self.subprocess.join()


def main():
    def process(data):
        print("--- process() ---")
        data["out_numbers"] = [number * number for number in data["in_numbers"]]
        time.sleep(0.5)
        return data

    def save(data):
        print("--- save() ---")
        time.sleep(1)
        print("Finished saving")

    num_workers = 8
    data_list = [
        {
            "filepath": "out/data.{}.json".format(i),
            "in_numbers": list(range(1000))

        } for i in range(5)
    ]

    # AsyncMultiprocess
    print("AsyncMultiprocess")
    saver_async_multiprocess = AsyncMultiprocess(save, num_workers)
    saver_async_multiprocess.start()

    t0 = time.time()
    for data in data_list:
        data = process(data)
        saver_async_multiprocess.add_work(data)
    saver_async_multiprocess.join()
    print("Done in {}s".format(time.time() - t0))

    print("Finished all!")


if __name__ == "__main__":
    main()