|
|
|
import os, time |
|
from openai import AsyncOpenAI |
|
import chainlit as cl |
|
import re |
|
import requests |
|
from io import BytesIO |
|
from chainlit.element import ElementBased |
|
from groq import Groq |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv("GROQ_API_KEY")) |
|
|
|
|
|
cl.instrument_openai() |
|
|
|
settings = { |
|
"model": "llama3-8b-8192", |
|
"temperature": 0.5, |
|
"max_tokens": 500, |
|
"top_p": 1, |
|
"frequency_penalty": 0, |
|
"presence_penalty": 0, |
|
} |
|
|
|
def extract_urls(text): |
|
url_pattern = re.compile(r'(https?://\S+)') |
|
return url_pattern.findall(text) |
|
|
|
def crawl_url(url): |
|
data = { |
|
"urls": [url], |
|
"include_raw_html": True, |
|
"word_count_threshold": 10, |
|
"extraction_strategy": "NoExtractionStrategy", |
|
"chunking_strategy": "RegexChunking" |
|
} |
|
response = requests.post("https://crawl4ai.com/crawl", json=data) |
|
response_data = response.json() |
|
response_data = response_data['results'][0] |
|
return response_data['markdown'] |
|
|
|
@cl.on_chat_start |
|
async def on_chat_start(): |
|
cl.user_session.set("session", { |
|
"history": [], |
|
"context": {} |
|
}) |
|
await cl.Message( |
|
content="Welcome to the chat! How can I assist you today?" |
|
).send() |
|
|
|
@cl.on_message |
|
async def on_message(message: cl.Message): |
|
user_session = cl.user_session.get("session") |
|
|
|
|
|
urls = extract_urls(message.content) |
|
|
|
|
|
futures = [] |
|
with ThreadPoolExecutor() as executor: |
|
for url in urls: |
|
futures.append(executor.submit(crawl_url, url)) |
|
|
|
results = [future.result() for future in futures] |
|
|
|
for url, result in zip(urls, results): |
|
ref_number = f"REF_{len(user_session['context']) + 1}" |
|
user_session["context"][ref_number] = { |
|
"url": url, |
|
"content": result |
|
} |
|
|
|
|
|
user_session["history"].append({ |
|
"role": "user", |
|
"content": message.content |
|
}) |
|
|
|
|
|
context_messages = [ |
|
f'<appendix ref="{ref}">\n{data["content"]}\n</appendix>' |
|
for ref, data in user_session["context"].items() |
|
] |
|
if context_messages: |
|
system_message = { |
|
"role": "system", |
|
"content": ( |
|
"You are a helpful bot. Use the following context for answering questions. " |
|
"Refer to the sources using the REF number in square brackets, e.g., [1], only if the source is given in the appendices below.\n\n" |
|
"If the question requires any information from the provided appendices or context, refer to the sources. " |
|
"If not, there is no need to add a references section. " |
|
"At the end of your response, provide a reference section listing the URLs and their REF numbers only if sources from the appendices were used.\n\n" |
|
"\n\n".join(context_messages) |
|
) |
|
} |
|
else: |
|
system_message = { |
|
"role": "system", |
|
"content": "You are a helpful assistant." |
|
} |
|
|
|
|
|
msg = cl.Message(content="") |
|
await msg.send() |
|
|
|
|
|
stream = await client.chat.completions.create( |
|
messages=[ |
|
system_message, |
|
*user_session["history"] |
|
], |
|
stream=True, |
|
**settings |
|
) |
|
|
|
assistant_response = "" |
|
async for part in stream: |
|
if token := part.choices[0].delta.content: |
|
assistant_response += token |
|
await msg.stream_token(token) |
|
|
|
|
|
user_session["history"].append({ |
|
"role": "assistant", |
|
"content": assistant_response |
|
}) |
|
await msg.update() |
|
|
|
|
|
reference_section = "\n\nReferences:\n" |
|
for ref, data in user_session["context"].items(): |
|
reference_section += f"[{ref.split('_')[1]}]: {data['url']}\n" |
|
|
|
msg.content += reference_section |
|
await msg.update() |
|
|
|
|
|
@cl.on_audio_chunk |
|
async def on_audio_chunk(chunk: cl.AudioChunk): |
|
if chunk.isStart: |
|
buffer = BytesIO() |
|
|
|
buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}" |
|
|
|
cl.user_session.set("audio_buffer", buffer) |
|
cl.user_session.set("audio_mime_type", chunk.mimeType) |
|
|
|
|
|
cl.user_session.get("audio_buffer").write(chunk.data) |
|
|
|
pass |
|
|
|
@cl.step(type="tool") |
|
async def speech_to_text(audio_file): |
|
cli = Groq() |
|
|
|
response = await client.audio.transcriptions.create( |
|
model="whisper-large-v3", file=audio_file |
|
) |
|
|
|
return response.text |
|
|
|
|
|
@cl.on_audio_end |
|
async def on_audio_end(elements: list[ElementBased]): |
|
|
|
audio_buffer: BytesIO = cl.user_session.get("audio_buffer") |
|
audio_buffer.seek(0) |
|
audio_file = audio_buffer.read() |
|
audio_mime_type: str = cl.user_session.get("audio_mime_type") |
|
|
|
start_time = time.time() |
|
whisper_input = (audio_buffer.name, audio_file, audio_mime_type) |
|
transcription = await speech_to_text(whisper_input) |
|
end_time = time.time() |
|
print(f"Transcription took {end_time - start_time} seconds") |
|
|
|
user_msg = cl.Message( |
|
author="You", |
|
type="user_message", |
|
content=transcription |
|
) |
|
await user_msg.send() |
|
await on_message(user_msg) |
|
|
|
|
|
if __name__ == "__main__": |
|
from chainlit.cli import run_chainlit |
|
run_chainlit(__file__) |
|
|
|
|
|
|