Shreyas094 commited on
Commit
4f73269
·
verified ·
1 Parent(s): d874338

Update backend/server/server_utils.py

Browse files
Files changed (1) hide show
  1. backend/server/server_utils.py +259 -259
backend/server/server_utils.py CHANGED
@@ -1,259 +1,259 @@
1
- import json
2
- import os
3
- import re
4
- import time
5
- import shutil
6
- from typing import Dict, List, Any
7
- from fastapi.responses import JSONResponse, FileResponse
8
- from gpt_researcher.document.document import DocumentLoader
9
- from backend.utils import write_md_to_pdf, write_md_to_word, write_text_to_md
10
- from pathlib import Path
11
- from datetime import datetime
12
- from fastapi import HTTPException
13
- import logging
14
-
15
- logging.basicConfig(level=logging.DEBUG)
16
- logger = logging.getLogger(__name__)
17
-
18
- class CustomLogsHandler:
19
- """Custom handler to capture streaming logs from the research process"""
20
- def __init__(self, websocket, task: str):
21
- self.logs = []
22
- self.websocket = websocket
23
- sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{task}")
24
- self.log_file = os.path.join("outputs", f"{sanitized_filename}.json")
25
- self.timestamp = datetime.now().isoformat()
26
- # Initialize log file with metadata
27
- os.makedirs("outputs", exist_ok=True)
28
- with open(self.log_file, 'w') as f:
29
- json.dump({
30
- "timestamp": self.timestamp,
31
- "events": [],
32
- "content": {
33
- "query": "",
34
- "sources": [],
35
- "context": [],
36
- "report": "",
37
- "costs": 0.0
38
- }
39
- }, f, indent=2)
40
-
41
- async def send_json(self, data: Dict[str, Any]) -> None:
42
- """Store log data and send to websocket"""
43
- # Send to websocket for real-time display
44
- if self.websocket:
45
- await self.websocket.send_json(data)
46
-
47
- # Read current log file
48
- with open(self.log_file, 'r') as f:
49
- log_data = json.load(f)
50
-
51
- # Update appropriate section based on data type
52
- if data.get('type') == 'logs':
53
- log_data['events'].append({
54
- "timestamp": datetime.now().isoformat(),
55
- "type": "event",
56
- "data": data
57
- })
58
- else:
59
- # Update content section for other types of data
60
- log_data['content'].update(data)
61
-
62
- # Save updated log file
63
- with open(self.log_file, 'w') as f:
64
- json.dump(log_data, f, indent=2)
65
- logger.debug(f"Log entry written to: {self.log_file}")
66
-
67
-
68
- class Researcher:
69
- def __init__(self, query: str, report_type: str = "research_report"):
70
- self.query = query
71
- self.report_type = report_type
72
- # Generate unique ID for this research task
73
- self.research_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(query)}"
74
- # Initialize logs handler with research ID
75
- self.logs_handler = CustomLogsHandler(self.research_id)
76
- self.researcher = GPTResearcher(
77
- query=query,
78
- report_type=report_type,
79
- websocket=self.logs_handler
80
- )
81
-
82
- async def research(self) -> dict:
83
- """Conduct research and return paths to generated files"""
84
- await self.researcher.conduct_research()
85
- report = await self.researcher.write_report()
86
-
87
- # Generate the files
88
- sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{self.query}")
89
- file_paths = await generate_report_files(report, sanitized_filename)
90
-
91
- # Get the JSON log path that was created by CustomLogsHandler
92
- json_relative_path = os.path.relpath(self.logs_handler.log_file)
93
-
94
- return {
95
- "output": {
96
- **file_paths, # Include PDF, DOCX, and MD paths
97
- "json": json_relative_path
98
- }
99
- }
100
-
101
- def sanitize_filename(filename: str) -> str:
102
- # Split into components
103
- prefix, timestamp, *task_parts = filename.split('_')
104
- task = '_'.join(task_parts)
105
-
106
- # Calculate max length for task portion
107
- # 255 - len("outputs/") - len("task_") - len(timestamp) - len("_.json") - safety_margin
108
- max_task_length = 255 - 8 - 5 - 10 - 6 - 10 # ~216 chars for task
109
-
110
- # Truncate task if needed
111
- truncated_task = task[:max_task_length] if len(task) > max_task_length else task
112
-
113
- # Reassemble and clean the filename
114
- sanitized = f"{prefix}_{timestamp}_{truncated_task}"
115
- return re.sub(r"[^\w\s-]", "", sanitized).strip()
116
-
117
-
118
- async def handle_start_command(websocket, data: str, manager):
119
- json_data = json.loads(data[6:])
120
- task, report_type, source_urls, document_urls, tone, headers, report_source = extract_command_data(
121
- json_data)
122
-
123
- if not task or not report_type:
124
- print("Error: Missing task or report_type")
125
- return
126
-
127
- # Create logs handler with websocket and task
128
- logs_handler = CustomLogsHandler(websocket, task)
129
- # Initialize log content with query
130
- await logs_handler.send_json({
131
- "query": task,
132
- "sources": [],
133
- "context": [],
134
- "report": ""
135
- })
136
-
137
- sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{task}")
138
-
139
- report = await manager.start_streaming(
140
- task,
141
- report_type,
142
- report_source,
143
- source_urls,
144
- document_urls,
145
- tone,
146
- websocket,
147
- headers
148
- )
149
- report = str(report)
150
- file_paths = await generate_report_files(report, sanitized_filename)
151
- # Add JSON log path to file_paths
152
- file_paths["json"] = os.path.relpath(logs_handler.log_file)
153
- await send_file_paths(websocket, file_paths)
154
-
155
-
156
- async def handle_human_feedback(data: str):
157
- feedback_data = json.loads(data[14:]) # Remove "human_feedback" prefix
158
- print(f"Received human feedback: {feedback_data}")
159
- # TODO: Add logic to forward the feedback to the appropriate agent or update the research state
160
-
161
- async def handle_chat(websocket, data: str, manager):
162
- json_data = json.loads(data[4:])
163
- print(f"Received chat message: {json_data.get('message')}")
164
- await manager.chat(json_data.get("message"), websocket)
165
-
166
- async def generate_report_files(report: str, filename: str) -> Dict[str, str]:
167
- pdf_path = await write_md_to_pdf(report, filename)
168
- docx_path = await write_md_to_word(report, filename)
169
- md_path = await write_text_to_md(report, filename)
170
- return {"pdf": pdf_path, "docx": docx_path, "md": md_path}
171
-
172
-
173
- async def send_file_paths(websocket, file_paths: Dict[str, str]):
174
- await websocket.send_json({"type": "path", "output": file_paths})
175
-
176
-
177
- def get_config_dict(
178
- langchain_api_key: str, openai_api_key: str, tavily_api_key: str,
179
- google_api_key: str, google_cx_key: str, bing_api_key: str,
180
- searchapi_api_key: str, serpapi_api_key: str, serper_api_key: str, searx_url: str
181
- ) -> Dict[str, str]:
182
- return {
183
- "LANGCHAIN_API_KEY": langchain_api_key or os.getenv("LANGCHAIN_API_KEY", ""),
184
- "OPENAI_API_KEY": openai_api_key or os.getenv("OPENAI_API_KEY", ""),
185
- "TAVILY_API_KEY": tavily_api_key or os.getenv("TAVILY_API_KEY", ""),
186
- "GOOGLE_API_KEY": google_api_key or os.getenv("GOOGLE_API_KEY", ""),
187
- "GOOGLE_CX_KEY": google_cx_key or os.getenv("GOOGLE_CX_KEY", ""),
188
- "BING_API_KEY": bing_api_key or os.getenv("BING_API_KEY", ""),
189
- "SEARCHAPI_API_KEY": searchapi_api_key or os.getenv("SEARCHAPI_API_KEY", ""),
190
- "SERPAPI_API_KEY": serpapi_api_key or os.getenv("SERPAPI_API_KEY", ""),
191
- "SERPER_API_KEY": serper_api_key or os.getenv("SERPER_API_KEY", ""),
192
- "SEARX_URL": searx_url or os.getenv("SEARX_URL", ""),
193
- "LANGCHAIN_TRACING_V2": os.getenv("LANGCHAIN_TRACING_V2", "true"),
194
- "DOC_PATH": os.getenv("DOC_PATH", "./my-docs"),
195
- "RETRIEVER": os.getenv("RETRIEVER", ""),
196
- "EMBEDDING_MODEL": os.getenv("OPENAI_EMBEDDING_MODEL", "")
197
- }
198
-
199
-
200
- def update_environment_variables(config: Dict[str, str]):
201
- for key, value in config.items():
202
- os.environ[key] = value
203
-
204
-
205
- async def handle_file_upload(file, DOC_PATH: str) -> Dict[str, str]:
206
- file_path = os.path.join(DOC_PATH, os.path.basename(file.filename))
207
- with open(file_path, "wb") as buffer:
208
- shutil.copyfileobj(file.file, buffer)
209
- print(f"File uploaded to {file_path}")
210
-
211
- document_loader = DocumentLoader(DOC_PATH)
212
- await document_loader.load()
213
-
214
- return {"filename": file.filename, "path": file_path}
215
-
216
-
217
- async def handle_file_deletion(filename: str, DOC_PATH: str) -> JSONResponse:
218
- file_path = os.path.join(DOC_PATH, os.path.basename(filename))
219
- if os.path.exists(file_path):
220
- os.remove(file_path)
221
- print(f"File deleted: {file_path}")
222
- return JSONResponse(content={"message": "File deleted successfully"})
223
- else:
224
- print(f"File not found: {file_path}")
225
- return JSONResponse(status_code=404, content={"message": "File not found"})
226
-
227
-
228
- async def execute_multi_agents(manager) -> Any:
229
- websocket = manager.active_connections[0] if manager.active_connections else None
230
- if websocket:
231
- report = await run_research_task("Is AI in a hype cycle?", websocket, stream_output)
232
- return {"report": report}
233
- else:
234
- return JSONResponse(status_code=400, content={"message": "No active WebSocket connection"})
235
-
236
-
237
- async def handle_websocket_communication(websocket, manager):
238
- while True:
239
- data = await websocket.receive_text()
240
- if data.startswith("start"):
241
- await handle_start_command(websocket, data, manager)
242
- elif data.startswith("human_feedback"):
243
- await handle_human_feedback(data)
244
- elif data.startswith("chat"):
245
- await handle_chat(websocket, data, manager)
246
- else:
247
- print("Error: Unknown command or not enough parameters provided.")
248
-
249
-
250
- def extract_command_data(json_data: Dict) -> tuple:
251
- return (
252
- json_data.get("task"),
253
- json_data.get("report_type"),
254
- json_data.get("source_urls"),
255
- json_data.get("document_urls"),
256
- json_data.get("tone"),
257
- json_data.get("headers", {}),
258
- json_data.get("report_source")
259
- )
 
1
+ import json
2
+ import os
3
+ import re
4
+ import time
5
+ import shutil
6
+ from typing import Dict, List, Any
7
+ from fastapi.responses import JSONResponse, FileResponse
8
+ from gpt_researcher.document.document import DocumentLoader
9
+ from backend.utils import write_md_to_pdf, write_md_to_word, write_text_to_md
10
+ from pathlib import Path
11
+ from datetime import datetime
12
+ from fastapi import HTTPException
13
+ import logging
14
+
15
+ logging.basicConfig(level=logging.DEBUG)
16
+ logger = logging.getLogger(__name__)
17
+
18
+ class CustomLogsHandler:
19
+ """Custom handler to capture streaming logs from the research process"""
20
+ def __init__(self, websocket, task: str):
21
+ self.logs = []
22
+ self.websocket = websocket
23
+ sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{task}")
24
+ self.log_file = os.path.join("/tmp/outputs", f"{sanitized_filename}.json")
25
+ self.timestamp = datetime.now().isoformat()
26
+ # Initialize log file with metadata
27
+ os.makedirs("/tmp/outputs", exist_ok=True)
28
+ with open(self.log_file, 'w') as f:
29
+ json.dump({
30
+ "timestamp": self.timestamp,
31
+ "events": [],
32
+ "content": {
33
+ "query": "",
34
+ "sources": [],
35
+ "context": [],
36
+ "report": "",
37
+ "costs": 0.0
38
+ }
39
+ }, f, indent=2)
40
+
41
+ async def send_json(self, data: Dict[str, Any]) -> None:
42
+ """Store log data and send to websocket"""
43
+ # Send to websocket for real-time display
44
+ if self.websocket:
45
+ await self.websocket.send_json(data)
46
+
47
+ # Read current log file
48
+ with open(self.log_file, 'r') as f:
49
+ log_data = json.load(f)
50
+
51
+ # Update appropriate section based on data type
52
+ if data.get('type') == 'logs':
53
+ log_data['events'].append({
54
+ "timestamp": datetime.now().isoformat(),
55
+ "type": "event",
56
+ "data": data
57
+ })
58
+ else:
59
+ # Update content section for other types of data
60
+ log_data['content'].update(data)
61
+
62
+ # Save updated log file
63
+ with open(self.log_file, 'w') as f:
64
+ json.dump(log_data, f, indent=2)
65
+ logger.debug(f"Log entry written to: {self.log_file}")
66
+
67
+
68
+ class Researcher:
69
+ def __init__(self, query: str, report_type: str = "research_report"):
70
+ self.query = query
71
+ self.report_type = report_type
72
+ # Generate unique ID for this research task
73
+ self.research_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(query)}"
74
+ # Initialize logs handler with research ID
75
+ self.logs_handler = CustomLogsHandler(self.research_id)
76
+ self.researcher = GPTResearcher(
77
+ query=query,
78
+ report_type=report_type,
79
+ websocket=self.logs_handler
80
+ )
81
+
82
+ async def research(self) -> dict:
83
+ """Conduct research and return paths to generated files"""
84
+ await self.researcher.conduct_research()
85
+ report = await self.researcher.write_report()
86
+
87
+ # Generate the files
88
+ sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{self.query}")
89
+ file_paths = await generate_report_files(report, sanitized_filename)
90
+
91
+ # Get the JSON log path that was created by CustomLogsHandler
92
+ json_relative_path = os.path.relpath(self.logs_handler.log_file)
93
+
94
+ return {
95
+ "output": {
96
+ **file_paths, # Include PDF, DOCX, and MD paths
97
+ "json": json_relative_path
98
+ }
99
+ }
100
+
101
+ def sanitize_filename(filename: str) -> str:
102
+ # Split into components
103
+ prefix, timestamp, *task_parts = filename.split('_')
104
+ task = '_'.join(task_parts)
105
+
106
+ # Calculate max length for task portion
107
+ # 255 - len("/tmp/outputs/") - len("task_") - len(timestamp) - len("_.json") - safety_margin
108
+ max_task_length = 255 - 8 - 5 - 10 - 6 - 10 # ~216 chars for task
109
+
110
+ # Truncate task if needed
111
+ truncated_task = task[:max_task_length] if len(task) > max_task_length else task
112
+
113
+ # Reassemble and clean the filename
114
+ sanitized = f"{prefix}_{timestamp}_{truncated_task}"
115
+ return re.sub(r"[^\w\s-]", "", sanitized).strip()
116
+
117
+
118
+ async def handle_start_command(websocket, data: str, manager):
119
+ json_data = json.loads(data[6:])
120
+ task, report_type, source_urls, document_urls, tone, headers, report_source = extract_command_data(
121
+ json_data)
122
+
123
+ if not task or not report_type:
124
+ print("Error: Missing task or report_type")
125
+ return
126
+
127
+ # Create logs handler with websocket and task
128
+ logs_handler = CustomLogsHandler(websocket, task)
129
+ # Initialize log content with query
130
+ await logs_handler.send_json({
131
+ "query": task,
132
+ "sources": [],
133
+ "context": [],
134
+ "report": ""
135
+ })
136
+
137
+ sanitized_filename = sanitize_filename(f"task_{int(time.time())}_{task}")
138
+
139
+ report = await manager.start_streaming(
140
+ task,
141
+ report_type,
142
+ report_source,
143
+ source_urls,
144
+ document_urls,
145
+ tone,
146
+ websocket,
147
+ headers
148
+ )
149
+ report = str(report)
150
+ file_paths = await generate_report_files(report, sanitized_filename)
151
+ # Add JSON log path to file_paths
152
+ file_paths["json"] = os.path.relpath(logs_handler.log_file)
153
+ await send_file_paths(websocket, file_paths)
154
+
155
+
156
+ async def handle_human_feedback(data: str):
157
+ feedback_data = json.loads(data[14:]) # Remove "human_feedback" prefix
158
+ print(f"Received human feedback: {feedback_data}")
159
+ # TODO: Add logic to forward the feedback to the appropriate agent or update the research state
160
+
161
+ async def handle_chat(websocket, data: str, manager):
162
+ json_data = json.loads(data[4:])
163
+ print(f"Received chat message: {json_data.get('message')}")
164
+ await manager.chat(json_data.get("message"), websocket)
165
+
166
+ async def generate_report_files(report: str, filename: str) -> Dict[str, str]:
167
+ pdf_path = await write_md_to_pdf(report, filename)
168
+ docx_path = await write_md_to_word(report, filename)
169
+ md_path = await write_text_to_md(report, filename)
170
+ return {"pdf": pdf_path, "docx": docx_path, "md": md_path}
171
+
172
+
173
+ async def send_file_paths(websocket, file_paths: Dict[str, str]):
174
+ await websocket.send_json({"type": "path", "output": file_paths})
175
+
176
+
177
+ def get_config_dict(
178
+ langchain_api_key: str, openai_api_key: str, tavily_api_key: str,
179
+ google_api_key: str, google_cx_key: str, bing_api_key: str,
180
+ searchapi_api_key: str, serpapi_api_key: str, serper_api_key: str, searx_url: str
181
+ ) -> Dict[str, str]:
182
+ return {
183
+ "LANGCHAIN_API_KEY": langchain_api_key or os.getenv("LANGCHAIN_API_KEY", ""),
184
+ "OPENAI_API_KEY": openai_api_key or os.getenv("OPENAI_API_KEY", ""),
185
+ "TAVILY_API_KEY": tavily_api_key or os.getenv("TAVILY_API_KEY", ""),
186
+ "GOOGLE_API_KEY": google_api_key or os.getenv("GOOGLE_API_KEY", ""),
187
+ "GOOGLE_CX_KEY": google_cx_key or os.getenv("GOOGLE_CX_KEY", ""),
188
+ "BING_API_KEY": bing_api_key or os.getenv("BING_API_KEY", ""),
189
+ "SEARCHAPI_API_KEY": searchapi_api_key or os.getenv("SEARCHAPI_API_KEY", ""),
190
+ "SERPAPI_API_KEY": serpapi_api_key or os.getenv("SERPAPI_API_KEY", ""),
191
+ "SERPER_API_KEY": serper_api_key or os.getenv("SERPER_API_KEY", ""),
192
+ "SEARX_URL": searx_url or os.getenv("SEARX_URL", ""),
193
+ "LANGCHAIN_TRACING_V2": os.getenv("LANGCHAIN_TRACING_V2", "true"),
194
+ "DOC_PATH": os.getenv("DOC_PATH", "./my-docs"),
195
+ "RETRIEVER": os.getenv("RETRIEVER", ""),
196
+ "EMBEDDING_MODEL": os.getenv("OPENAI_EMBEDDING_MODEL", "")
197
+ }
198
+
199
+
200
+ def update_environment_variables(config: Dict[str, str]):
201
+ for key, value in config.items():
202
+ os.environ[key] = value
203
+
204
+
205
+ async def handle_file_upload(file, DOC_PATH: str) -> Dict[str, str]:
206
+ file_path = os.path.join(DOC_PATH, os.path.basename(file.filename))
207
+ with open(file_path, "wb") as buffer:
208
+ shutil.copyfileobj(file.file, buffer)
209
+ print(f"File uploaded to {file_path}")
210
+
211
+ document_loader = DocumentLoader(DOC_PATH)
212
+ await document_loader.load()
213
+
214
+ return {"filename": file.filename, "path": file_path}
215
+
216
+
217
+ async def handle_file_deletion(filename: str, DOC_PATH: str) -> JSONResponse:
218
+ file_path = os.path.join(DOC_PATH, os.path.basename(filename))
219
+ if os.path.exists(file_path):
220
+ os.remove(file_path)
221
+ print(f"File deleted: {file_path}")
222
+ return JSONResponse(content={"message": "File deleted successfully"})
223
+ else:
224
+ print(f"File not found: {file_path}")
225
+ return JSONResponse(status_code=404, content={"message": "File not found"})
226
+
227
+
228
+ async def execute_multi_agents(manager) -> Any:
229
+ websocket = manager.active_connections[0] if manager.active_connections else None
230
+ if websocket:
231
+ report = await run_research_task("Is AI in a hype cycle?", websocket, stream_output)
232
+ return {"report": report}
233
+ else:
234
+ return JSONResponse(status_code=400, content={"message": "No active WebSocket connection"})
235
+
236
+
237
+ async def handle_websocket_communication(websocket, manager):
238
+ while True:
239
+ data = await websocket.receive_text()
240
+ if data.startswith("start"):
241
+ await handle_start_command(websocket, data, manager)
242
+ elif data.startswith("human_feedback"):
243
+ await handle_human_feedback(data)
244
+ elif data.startswith("chat"):
245
+ await handle_chat(websocket, data, manager)
246
+ else:
247
+ print("Error: Unknown command or not enough parameters provided.")
248
+
249
+
250
+ def extract_command_data(json_data: Dict) -> tuple:
251
+ return (
252
+ json_data.get("task"),
253
+ json_data.get("report_type"),
254
+ json_data.get("source_urls"),
255
+ json_data.get("document_urls"),
256
+ json_data.get("tone"),
257
+ json_data.get("headers", {}),
258
+ json_data.get("report_source")
259
+ )