File size: 4,562 Bytes
5620e1f
 
 
 
 
 
 
6a14896
 
 
 
 
 
5620e1f
c997974
5620e1f
c997974
 
5620e1f
c997974
 
 
5620e1f
 
 
 
b3ec1fd
5620e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba1e90
5620e1f
 
 
fba1e90
5620e1f
 
 
ec039dd
fba1e90
5620e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a14896
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import re
import logging
import json
from langchain.schema import (
    HumanMessage,
    SystemMessage,
)
import requests
from datetime import datetime
from uuid import uuid4




def save_logs(scheduler, JSON_DATASET_PATH, logs, feedback=None) -> None:
    """ Every interaction with app saves the log of question and answer, 
        this is to get the usage statistics of app and evaluate model performances.
        Also saves user feedback (when provided).
    """
    if feedback:
        logs["feedback"] = feedback #optional
        
    with scheduler.lock:
        with JSON_DATASET_PATH.open("a") as f:
            json.dump(logs, f)
            f.write("\n")
    print("logging done")

def get_message_template(type, SYSTEM_PROMPT, USER_PROMPT):
    if type == 'NVIDIA':
        messages =  [{"role": "system", "content": SYSTEM_PROMPT},
                {"role":"user","content":USER_PROMPT}]
    elif type == 'DEDICATED':
        messages = [
                 SystemMessage(content=SYSTEM_PROMPT),
                 HumanMessage(content=USER_PROMPT),]
    else:
        messages = None
    
    return messages


def make_html_source(source,i):
    """
    takes the text and converts it into html format for display in "source" side tab
    """
    meta = source.metadata
    content = source.page_content.strip()

    name = meta['subtype']
    card = f"""
        <div class="card" id="doc{i}">
            <div class="card-content">
                <h2>Doc {i} - {meta['subtype']} - Page {int(meta['page'])}</h2>
                <p>{content}</p>
            </div>
            <div class="card-footer">
                <span>{name}</span>
                <a href="{meta['subtype']}#page={int(meta['page'])}" target="_blank" class="pdf-link">
                    <span role="img" aria-label="Open PDF">🔗</span>
                </a>
            </div>
        </div>
        """

    return card


def parse_output_llm_with_sources(output):
    # Split the content into a list of text and "[Doc X]" references
    content_parts = re.split(r'\[(Doc\s?\d+(?:,\s?Doc\s?\d+)*)\]', output)
    parts = []
    for part in content_parts:
        if part.startswith("Doc"):
            subparts = part.split(",")
            subparts = [subpart.lower().replace("doc","").strip() for subpart in subparts]
            subparts = [f"""<a href="#doc{subpart}" class="a-doc-ref" target="_self"><span class='doc-ref'><sup>{subpart}</sup></span></a>""" for subpart in subparts]
            parts.append("".join(subparts))
        else:
            parts.append(part)
    content_parts = "".join(parts)
    return content_parts

def get_client_ip(request=None):
    """Get the client IP address from the request context"""
    try:
        if request:
            # Try different headers that might contain the real IP
            ip = request.client.host
            # Check for proxy headers
            forwarded_for = request.headers.get('X-Forwarded-For')
            if forwarded_for:
                # X-Forwarded-For can contain multiple IPs - first one is the client
                ip = forwarded_for.split(',')[0].strip()
            
            logging.debug(f"Client IP detected: {ip}")
            return ip
    except Exception as e:
        logging.error(f"Error getting client IP: {e}")
    return "127.0.0.1"


def get_client_location(ip_address) -> dict | None:
    """Get geolocation info using ipapi.co"""
    # Add headers so we don't get blocked...
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(
            f'https://ipapi.co/{ip_address}/json/',
            headers=headers,
            timeout=5
        )
        if response.status_code == 200:
            data = response.json()
            return {
                'city': data.get('city'),
                'region': data.get('region'),
                'country': data.get('country_name'),
                'latitude': data.get('latitude'),
                'longitude': data.get('longitude')
            }
        elif response.status_code == 429:
            logging.warning(f"Rate limit exceeded. Response: {response.text}")
            return None
        else:
            logging.error(f"Error: Status code {response.status_code}. Response: {response.text}")
            return None
            
    except requests.exceptions.RequestException as e:
        logging.error(f"Request failed: {str(e)}")
        return None