File size: 4,867 Bytes
5620e1f
 
 
 
 
 
 
6a14896
 
 
 
5a3e284
a7ba16f
 
 
 
 
8aba5e3
 
5a3e284
5f9af61
 
 
 
5981899
 
5a3e284
41235aa
5a3e284
 
 
 
 
5981899
a7ba16f
5981899
5620e1f
a7ba16f
5620e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba1e90
5620e1f
 
 
fba1e90
5620e1f
 
 
ec039dd
fba1e90
5620e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a14896
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import re
import logging
import json
from langchain.schema import (
    HumanMessage,
    SystemMessage,
)
import requests
from datetime import datetime
from uuid import uuid4


# TESTING DEBUG LOG
from auditqa.logging_config import setup_logging
setup_logging()
import logging
logger = logging.getLogger(__name__)


def save_logs(scheduler, JSON_DATASET_PATH, logs, feedback=None) -> None:
    """ Every interaction with app saves the log of question and answer, 
        this is to get the usage statistics of app and evaluate model performances.
        Also saves user feedback (when provided).
    """
    try:
        if feedback:
            logs["feedback"] = feedback #optional
        
        with scheduler.lock:
            with open(JSON_DATASET_PATH, 'a') as f:
                json.dump(logs, f)
                f.write("\n")
            logger.info("logging done")
    except Exception as e:
        logger.error(f"Failed to save logs to {JSON_DATASET_PATH}: {str(e)}")
        raise


def get_message_template(type, SYSTEM_PROMPT, USER_PROMPT):
    if type == 'NVIDIA':
        messages =  [{"role": "system", "content": SYSTEM_PROMPT},
                {"role":"user","content":USER_PROMPT}]
    elif type == 'DEDICATED':
        messages = [
                 SystemMessage(content=SYSTEM_PROMPT),
                 HumanMessage(content=USER_PROMPT),]
    else:
        messages = None
    
    return messages


def make_html_source(source,i):
    """
    takes the text and converts it into html format for display in "source" side tab
    """
    meta = source.metadata
    content = source.page_content.strip()

    name = meta['subtype']
    card = f"""
        <div class="card" id="doc{i}">
            <div class="card-content">
                <h2>Doc {i} - {meta['subtype']} - Page {int(meta['page'])}</h2>
                <p>{content}</p>
            </div>
            <div class="card-footer">
                <span>{name}</span>
                <a href="{meta['subtype']}#page={int(meta['page'])}" target="_blank" class="pdf-link">
                    <span role="img" aria-label="Open PDF">🔗</span>
                </a>
            </div>
        </div>
        """

    return card


def parse_output_llm_with_sources(output):
    # Split the content into a list of text and "[Doc X]" references
    content_parts = re.split(r'\[(Doc\s?\d+(?:,\s?Doc\s?\d+)*)\]', output)
    parts = []
    for part in content_parts:
        if part.startswith("Doc"):
            subparts = part.split(",")
            subparts = [subpart.lower().replace("doc","").strip() for subpart in subparts]
            subparts = [f"""<a href="#doc{subpart}" class="a-doc-ref" target="_self"><span class='doc-ref'><sup>{subpart}</sup></span></a>""" for subpart in subparts]
            parts.append("".join(subparts))
        else:
            parts.append(part)
    content_parts = "".join(parts)
    return content_parts

def get_client_ip(request=None):
    """Get the client IP address from the request context"""
    try:
        if request:
            # Try different headers that might contain the real IP
            ip = request.client.host
            # Check for proxy headers
            forwarded_for = request.headers.get('X-Forwarded-For')
            if forwarded_for:
                # X-Forwarded-For can contain multiple IPs - first one is the client
                ip = forwarded_for.split(',')[0].strip()
            
            logging.debug(f"Client IP detected: {ip}")
            return ip
    except Exception as e:
        logging.error(f"Error getting client IP: {e}")
    return "127.0.0.1"


def get_client_location(ip_address) -> dict | None:
    """Get geolocation info using ipapi.co"""
    # Add headers so we don't get blocked...
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(
            f'https://ipapi.co/{ip_address}/json/',
            headers=headers,
            timeout=5
        )
        if response.status_code == 200:
            data = response.json()
            return {
                'city': data.get('city'),
                'region': data.get('region'),
                'country': data.get('country_name'),
                'latitude': data.get('latitude'),
                'longitude': data.get('longitude')
            }
        elif response.status_code == 429:
            logging.warning(f"Rate limit exceeded. Response: {response.text}")
            return None
        else:
            logging.error(f"Error: Status code {response.status_code}. Response: {response.text}")
            return None
            
    except requests.exceptions.RequestException as e:
        logging.error(f"Request failed: {str(e)}")
        return None