File size: 3,331 Bytes
5120311
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# import os
# os.environ["CUDA_VISIBLE_DEVICES"] ="-1"
import pika
import json
import time
import requests

from merge_topic import main
# from get_config import config_params
from config import get_config

config_params = get_config()
ConfigManager = config_params['ConfigManager']

def update_result(result, type='daily', meta = {}):
    benchmark_children_id = -1
    benchmark_id = -1
    source_tagids = []
    for id_cluster in result:
        for doc in result[id_cluster][:1]:
            source_tagids = doc.get('source_tagids',[])
            for key in doc:
                if "benchmark_child" in key:
                    benchmark_children_id = int(key.lstrip('benchmark_child_'))
                if "benchmark" in key and 'child' not in key:  
                    benchmark_id = int(key.lstrip('benchmark_'))
        break
    
    if not source_tagids:
        source_tagids = []
    if len(source_tagids) > 0:
        benchmark_id = 0
        benchmark_children_id = 0

    output = {
        "benchmark_id": benchmark_id,
        "benchmark_children_id": benchmark_children_id,
        "source_tagids": source_tagids,
        "country_code": meta.get('country_code',''),
        "type": type,
        "data": json.dumps(result)
    }

    # with open('test_result.json','w') as f:
    #     json.dump(output, f, ensure_ascii=False)
    
    # url = config_params['api_save_clustering']
    url = ConfigManager['ApiConnects']['api_save_clustering']['BaseUrl']

    res = requests.post(url, json = output)
    print(res.text)
    print('Update result !!!!!!!!!')

def callback_func(ch, method, properties, body):
    print("receive done: ")
    starttime = time.time()
    body = json.loads(body.decode("utf-8"))
    
    req = body
    type = req['type']
    meta = req.get('meta', {})
    res = main(req)
    update_result(res, type, meta=meta)
    print('Time process:', time.time() - starttime)
    ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == '__main__':
    params = ConfigManager['QueueConfigs']['queue_merge_clustering']
    usr_name = params["UserName"]
    password = str(params["Password"])
    host = params["HostName"]
    virtual_host = params["VirtualHost"]
    queue_name = params["Queue"]

    # params = config_params['queue_merge_clustering']
    # usr_name = params["usr_name"]
    # password = str(params["password"])
    # host = params["host"]
    # virtual_host = params["virtual_host"]
    # queue_name = params["queue_name"]
    
    while True:
        try:
            credentials = pika.PlainCredentials(usr_name, password)
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600))
            channel = connection.channel()
            channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10})
            print(" * wait message")
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(queue=queue_name, on_message_callback=callback_func)
            channel.start_consuming()
        except Exception as ex:
            print(f'[ERROR] ', ex)
            # raise ex